Title: Getting Started Notice: Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at . http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. *Getting Started* will guide you through the process of creating a simple Crunch pipeline to count the words in a text document, which is the Hello World of distributed computing. Along the way, we'll explain the core Crunch concepts and how to use them to create effective and efficient data pipelines. Overview ======== The Apache Crunch project develops and supports Java APIs that simplify the process of creating data pipelines on top of Apache Hadoop. The Crunch APIs are modeled after [FlumeJava (PDF)](http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf), which is the library that Google uses for building data pipelines on top of their own implementation of MapReduce. One of the most common questions we hear is how Crunch compares to other projects that provide abstractions on top of MapReduce, such as [Apache Pig](http://pig.apache.org/), [Apache Hive](http://hive.apache.org/), and [Cascading](http://www.cascading.org/). 1. *Developer focused.* Apache Hive and Apache Pig were built to make MapReduce accessible to data analysts with limited experience in Java programming. Crunch was designed for developers who understand Java and want to use MapReduce effectively in order to write fast, reliable applications that need to meet tight SLAs. Crunch is often used in conjunction with Hive and Pig; a Crunch pipeline written by the development team sessionizes a set of user logs generates are then processed by a diverse collection of Pig scripts and Hive queries written by analysts. 2. *Minimal abstractions.* Crunch pipelines provide a thin veneer on top of MapReduce. Developers have access to low-level MapReduce APIs whenever they need them. This mimimalism also means that Crunch is extremely fast, only slightly slower than a hand-tuned pipeline developed with the MapReduce APIs, and the community is working on making it faster all the time. That said, one of the goals of the project is portability, and the abstractions that Crunch provides are designed to ease the transition from Hadoop 1.0 to Hadoop 2.0 and to provide transparent support for future data processing frameworks that run on Hadoop, including [Apache Spark](http://spark.incubator.apache.org/) and [Apache Tez](http://tez.incubator.apache.org/). 3. *Flexible Data Model.* Hive, Pig, and Cascading all use a tuple-centric data model that works best when your input data can be represented using a named collection of scalar values, much like the rows of a database table. Crunch allows developers considerable flexibility in how they represent their data, which makes Crunch the best pipeline platform for developers working with complex structures like [Apache Avro records](http://avro.apache.org) or [protocol buffers](https://github.com/kevinweil/elephant-bird/tree/master/crunch), [geospatial](http://thunderheadxpler.blogspot.com/2013/05/creating-spatial-crunch-pipelines.html) and [time series](http://blog.cloudera.com/blog/2012/01/seismic-data-science-hadoop-use-case/) data, and data stored in [Apache HBase](http://hbase.apache.org) tables. Which Version of Crunch Do I Need? ================================== The core libraries are primarily developed against Hadoop 1.1.2, and are also tested against Hadoop 2.2.0. They should work with any version of Hadoop 1.x after 1.0.3 and any version of Hadoop 2.x after 2.0.0-alpha, although you should note that some of Hadoop 2.x's dependencies changed between 2.0.4-alpha and 2.2.0 (for example, the protocol buffer library switched from 2.4.1 to 2.5.0.) Crunch is also known to work with distributions from vendors like Cloudera, Hortonworks, and IBM. The Crunch libraries are _not_ compatible with version of Hadoop prior to 1.x, such as 0.20.2. If you're using the crunch-hbase library, please note that Crunch 0.9.0 switched to using HBase 0.96.0, while all prior versions of crunch-hbase were developed against HBase 0.94.3. Here are all of the currently recommended Crunch versions in one convenient table:
Hadoop Versions | HBase Versions | Recommended Crunch Version |
1.x | 0.96.x | 0.12.0 |
2.x | 1.0 | 0.14.0 |
<dependency> <groupId>org.apache.crunch</groupId> <artifactId>crunch-core</artifactId> <version>${crunch.version}</version> </dependency>The `crunch-core` artifact contains the core libraries for planning and executing MapReduce pipelines. Depending on your use case, you may also find the following artifacts useful: * `crunch-test`: Helper classes for integration testing of Crunch pipelines * `crunch-hbase`: Utilities for pipelines that read/write data to Apache HBase * `crunch-scrunch`: Scrunch, a Scala API for Crunch * `crunch-spark`: Executes Crunch pipelines using Apache Spark * `crunch-contrib`: Extra Crunch libraries for text processing, JDBC connections, and BloomFilters. * `crunch-examples`: Example MapReduce and HBase pipelines * `crunch-archetype`: A Maven archetype for creating new Crunch pipeline projects Building From Source -------------------- You can download the most recently released Crunch libraries from the [Download](download.html) page or from the Maven Central Repository. If you prefer, you can also build the Crunch libraries from the source code using Maven and install it in your local repository: mvn clean install This also runs the integration test suite which will take a while to complete; you can skip them by running with the `-DskipTests` option. If you are planning to run Crunch against Hadoop 2.x, you should also specify `-Dcrunch.platform=2`. After you have built Crunch, you can run the bundled example applications such as WordCount: hadoop jar crunch-examples/target/crunch-examples-*-job.jar org.apache.crunch.examples.WordCount
git clone http://github.com/jwills/crunch-demoYou can also use the following Maven archetype, which will generate the same code as the example and allow you to choose a different version of Crunch. Enter the following command and answer the questions as shown below:
$ mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype [...] 1: remote -> org.apache.crunch:crunch-archetype (Create a basic, self-contained job with the core library.) Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1 [CHOOSE THE VERSION OF CRUNCH YOU WANT TO USE] Define value for property 'groupId': : com.example Define value for property 'artifactId': : crunch-demo Define value for property 'version': 1.0-SNAPSHOT: : [HIT ENTER] Define value for property 'package': com.example: : [HIT ENTER] Confirm properties configuration: groupId: com.example artifactId: crunch-demo version: 1.0-SNAPSHOT package: com.example Y: : [HIT ENTER] [...] $The example Maven project contains an example application that counts word frequencies in text files:
$ cd crunch-demo $ tree . |-- pom.xml `-- src |-- main | |-- assembly | | `-- hadoop-job.xml | `-- java | `-- com | `-- example | |-- StopWordFilter.java | |-- Tokenizer.java | `-- WordCount.java `-- test `-- java `-- com `-- example |-- StopWordFilterTest.java `-- TokenizerTest.javaThe `WordCount.java` file contains the main class that defines a pipeline application which is referenced from `pom.xml`. Build the code:
$ mvn packageYour packaged application is created in the `target` directory. The build process uses Maven's assembly plugin with some configuration in `hadoop-job.xml` to create a special JAR file (suffix `-job.jar`). Depending on your Hadoop configuration, you can run it locally or on a cluster using Hadoop's launcher script:
$ hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar <in> <out>The `
public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); }The `WordCount` class extends `Configured` and implements `Tool`, which allows us to use Hadoop's `ToolRunner` class to parse the standard commandline arguments for MapReduce jobs and make them available to the `WordCount` class via the `getConf()` method that is inherited from `Configured`. This is an easy way to allow us to override Hadoop configuration parameters for our job from the commandline, without having to update and recompile our application. The Crunch-specific bits are introduced in the `run` method, just after the commandline argument parsing is completed:
String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf());Every Crunch job begins with a `Pipeline` instance that manages the execution lifecycle of your data pipeline. As of the 0.9.0 release, there are three implementations of the Pipeline interface: 1. `MRPipeline`: Executes a pipeline as a series of MapReduce jobs that can run locally or on a Hadoop cluster. 2. `MemPipeline`: Executes a pipeline in-memory on the client. 3. `SparkPipeline`: Executes a pipeline by running a series of [Apache Spark](http://spark.incubator.apache.org/) jobs, either locally or on a Hadoop cluster. The MemPipeline is most useful when you are initially developing and testing the logic of your pipeline on small, local data sets. The MRPipeline is the oldest and most robust of the Pipeline implementations for processing large amounts of data. The SparkPipeline is the newest implementation and leverages features of the underlying Spark engine that should allow it to run substantially faster than the MRPipeline, especially when your problem requires running many iterations over the same data. You can read more about the properties and configuration options of the different Pipeline implementations in this section of the [user guide](user-guide.html#pipelines). Once we've created our Pipeline instance, we need to identify the location and format of the data that our pipeline should process:
// Reference a given text file as a collection of Strings. PCollection<String> lines = pipeline.readTextFile(inputPath);A `PCollection
// Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());The `Tokenizer` instance in this snippet is a subclass of Crunch's `DoFn
public class Tokenizer extends DoFn<String, String> { private static final Splitter SPLITTER = Splitter.onPattern("\\s+").omitEmptyStrings(); @Override public void process(String line, Emitter<String> emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } }To apply a DoFn to a PCollection, we use the PCollection's `parallelDo(DoFn
// Take the collection of words and remove known stop words. PCollection<String> noStopWords = words.filter(new StopWordFilter());This snippet references a `StopWordFilter` instance, which is a subclass of Crunch's `FilterFn
public class StopWordFilter extends FilterFn<String> { // English stop words, borrowed from Lucene. private static final Set<String> STOP_WORDS = ImmutableSet.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }The Crunch libraries have a number of specialized implementations of DoFn and associated methods for PCollection that can clarify the intent of the steps in your data processing pipelines. You can review these convenience classes in [this section](user-guide.html#mapfn) of the user guide. Now that we have our filtered list of tokens from the input file, we would like to count how often each word occurs. Crunch provides a `PTable
// The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable<String, Long> counts = noStopWords.count();A `PTable
// Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath);Just as Pipeline has a convenience method for reading in text files, the `writeTextFile(PCollection
// Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1;The `PipelineResult` instance has methods that indicate whether the jobs that were run as part of the pipeline succeeded or failed and also contains statistics and the Hadoop `Counters` associated with the individual jobs. You can get more details on how to manage pipeline runs in [this section](user-guide.html#exec) of the user guide. We hope you enjoyed your first walk through a Crunch pipeline. You can get more detailed information about developing pipelines with the Crunch libraries in the user guide, and you are also welcome to ask questions or report any problems you have on the [project's mailing list.](mailto:user@crunch.apache.org)