](apidocs/0.10.0/org/apache/crunch/Aggregator.html)
interface. Of the two, an Aggregator is probably the way you want to go; Crunch provides a number of
[Aggregators](0.10.0/org/apache/crunch/fn/Aggregators.html), and they are a bit easier to write and compose together.
Let's walk through a few example aggregations:
PTable<String, Double> data = ...;
// Sum the values of the doubles for each key.
PTable<String, Double> sums = data.groupByKey().combineValues(Aggregators.SUM_DOUBLES());
// Find the ten largest values for each key.
PTable<String, Double> maxes = data.groupByKey().combineValues(Aggregators.MAX_DOUBLES(10));
PTable<String, String> text = ...;
// Get a random sample of 100 unique elements for each key.
PTable<String, String> samp = text.groupByKey().combineValues(Aggregators.SAMPLE_UNIQUE_ELEMENTS(100));
We can also use Aggregators together in combination to build more complex aggregations, like to compute
the average of a set of values:
PTable<String, Double> data = ...;
// Create an auxillary long that is used to count the number of times each key
// appears in the data set.
PTable<String, Pair<Double, Long>> c = data.mapValues(
new MapFn<Double, Pair<Double, Long>>() { Pair<Double, Long> map(Double d) { return Pair.of(d, 1L); } },
pairs(doubles(), longs()));
// Aggregate the data, using a pair of aggregators: one to sum the doubles, and the other
// to sum the auxillary longs that are the counts.
PTable<String, Pair<Double, Long>> agg = c.groupByKey().combineValues(
Aggregators.pairAggregator(Aggregators.SUM_DOUBLES(), Aggregators.SUM_LONGS()));
// Compute the average by dividing the sum of the doubles by the sum of the longs.
PTable<String, Double> avg = agg.mapValues(new MapFn<Pair<Double, Long>, Double>() {
Double map(Pair<Double, Long> p) { return p.first() / p.second(); }
}, doubles());
### Simple Aggregations
Many of the most common aggregation patterns in Crunch are provided as methods on the PCollection
interface, including `count`, `max`, `min`, and `length`. The implementations of these methods,
however, are in the [Aggregate](apidocs/0.10.0/org/apache/crunch/lib/Aggregate.html) library class.
The methods in the Aggregate class expose some additional options that you can use for performing
aggregations, such as controlling the level of parallelism for count operations:
PCollection<String> data = ...;
PTable<String, Long> cnt1 = data.count();
PTable<String, Long> cnt2 = Aggregate.count(data, 10); // use 10 reducers
PTable has additional aggregation methods, `top` and `bottom`, that can be used to get the
most (or least) frequently occuring key-value pairs in the PTable based on the value, which
must implement `Comparable`. To count up all of the elements in a set and then get the 20
most frequently occuring elements, you would run:
PCollection<String> data = ...;
PTable<String, Long> top = data.count().top(20);
### Joining Data
Joins in Crunch are based on equal-valued keys in different PTables. Joins have also evolved
a great deal in Crunch over the lifetime of the project. The [Join](apidocs/0.10.0/org/apache/crunch/lib/Join.html)
API provides simple methods for performing equijoins, left joins, right joins, and full joins, but modern
Crunch joins are usually performed using an explicit implementation of the [JoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/JoinStrategy.html)
interface, which has support for the same rich set of joins that you can use in tools like Apache Hive and
Apache Pig.
All of the algorithms discussed below implement the JoinStrategy interface, which defines a single join method:
PTable<K, V1> one = ...;
PTable<K, V2> two = ...;
JoinStrategy<K, V1, V2> strategy = ...;
PTable<K, Pair<V1, V2>> joined = strategy.join(one, two, JoinType);
The [JoinType](apidocs/0.10.0/org/apache/crunch/lib/join/JoinType.html) enum determines which
kind of join is applied: inner, outer, left, right, or full. In general, the smaller of the two
inputs should be the left-most argument to the join method.
Note that the values of the PTables you join should be non-null. The join
algorithms in Crunch use null as a placeholder to represent that there are no values for
a given key in a PCollection, so joining PTables that contain null values may have
surprising results. Using a non-null dummy value in your PCollections is a good idea in
general.
#### Reduce-side Joins
Reduce-side joins are handled by the [DefaultJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/DefaultJoinStrategy.html).
Reduce-side joins are the simplest and most robust kind of joins in Hadoop; the keys from the two inputs are
shuffled together to the reducers, where the values from the smaller of the two collections are collected and then
streamed over the values from the larger of the two collections. You can control the number of reducers that is used
to perform the join by passing an integer argument to the DefaultJoinStrategy constructor.
#### Map-side Joins
Map-side joins are handled by the [MapsideJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/MapsideJoinStrategy.html).
Map-side joins require that the smaller of the two input tables is loaded into memory on the tasks on the cluster, so
there is a requirement that at least one of the tables be relatively small so that it can comfortably fit into memory within
each task.
For a long time, the MapsideJoinStrategy differed from the rest of the JoinStrategy
implementations in that the left-most argument was intended to be larger than the right-side
one, since the right-side PTable was loaded into memory. Since Crunch 0.10.0/0.8.3, we
have deprecated the old MapsideJoinStrategy constructor which had the sizes reversed and
recommend that you use the `MapsideJoinStrategy.create()` factory method, which returns an
implementation of the MapsideJoinStrategy in which the left-side PTable is loaded into
memory instead of the right-side PTable.
#### Sharded Joins
Many distributed joins have skewed data that can cause regular reduce-side joins to fail due to out-of-memory issues on
the partitions that happen to contain the keys with highest cardinality. To handle these skew issues, Crunch has the
[ShardedJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/ShardedJoinStrategy.html) that allows developers to shard
each key to multiple reducers, which prevents a few reducers from getting overloaded with the values from the skewed keys
in exchange for sending more data over the wire. For problems with significant skew issues, the ShardedJoinStrategy can
significantly improve performance.
#### Bloom Filter Joins
Last but not least, the [BloomFilterJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/BloomFilterJoinStrategy.html) builds
a [bloom filter](http://en.wikipedia.org/wiki/Bloom_filter) on the left-hand side table that is used to filter the contents
of the right-hand side table to eliminate entries from the (larger) right-hand side table that have no hope of being joined
to values in the left-hand side table. This is useful in situations in which the left-hand side table is too large to fit
into memory on the tasks of the job, but is still significantly smaller than the right-hand side table, and we know that the
vast majority of the keys in the right-hand side table will not match the keys in the left-hand side of the table.
#### Cogroups
Some kinds of joins are richer and more complex then the typical kind of relational join that are handled by JoinStrategy.
For example, we might want to join two datasets
together and only emit a record if each of the sets had at least two distinct values associated
with each key. For arbitrary complex join logic, we can always fall back to the
[Cogroup](apidocs/0.10.0/org/apache/crunch/lib/Cogroup.html) API, which takes in an arbitrary number
of PTable instances that all have the same key type and combines them together into a single
PTable whose values are made up of Collections of the values from each of the input PTables.
PTable<String, Long> one = ...;
PTable<String, String> two = ...;
PTable<String, Boolean> three = ...;
// cogroup is also a built-in method on the PTable interface.
PTable<String, Pair<Collection<Long>, Collection<String>>> cg = one.cogroup(two);
// For more than two cogrouped tables, we have a helper interface called Tuple.Collect to cut down on
// the typing.
PTable<String, Tuple3.Collect<Long, String, Boolean>> cgAll = Cogroup.cogroup(one, two, three);
Crunch's cogroup operations work just like the cogroup operation in Apache Pig; for more details on
how they work, you can consult the [section on cogroups](http://chimera.labs.oreilly.com/books/1234000001811/ch06.html)
in the Apache Pig book.
### Sorting
After joins and cogroups, sorting data is the most common distributed computing pattern. The
Crunch APIs have a number of utilities for performing fully distributed sorts as well as
more advanced patterns like secondary sorts.
#### Standard and Reverse Sorting
The [Sort](apidocs/0.10.0/org/apache/crunch/lib/Sort.html) API methods contain utility functions
for sorting the contents of PCollections and PTables whose contents implement the `Comparable`
interface. By default, MapReduce does not perform total sorts on its keys during a shuffle; instead
a sort is done locally on each of the partitions of the data that are sent to each reducer. Doing
a total sort requires identifying a set of partition keys in the input data and then sending
the keys to a different partition based on the keys assigned to each partition. This has the potential
to lead to highly imbalanced shuffles that can take a long time to run, but it does make total sorts
at scale possible.
By default, Crunch will prefer to handle sorts with a single reducer. The Sort API provides a number
of methods that expose the option for a larger number of partitions to be used, at which point the
total order partitioner and sorting controls will be enabled.
PCollection<Long> data = ...;
// Sorted in descending order, by default.
PCollection<Long> sorted = Sort.sort(data);
// Sorting in ascending order, with 10 partitions.
PCollection<Long> sorted2 = Sort.sort(data, 10, Sort.Order.ASCENDING);
For more complex PCollections or PTables that are made up of Tuples (Pairs, Tuple3, etc.), we can
specify which columns of the Tuple should be used for sorting the contents, and in which order, using
the [ColumnOrder](apidocs/0.10.0/org/apache/crunch/lib/Sort.ColumnOrder.html) class:
PTable<String, Long> table = ...;
// Sorted by value, instead of key -- remember, a PTable is a PCollection of Pairs.
PCollection<Pair<String, Long>> sortedByValue = Sort.sortPairs(table, ColumnOrder.by(1, Sort.Order.DESCENDING));
#### Secondary Sorts
Another pattern that occurs frequently in distributed processing is _secondary sorts_, where we
want to group a set of records by one key and sort the records within each group by a second key.
The [SecondarySort](apidocs/0.10.0/org/apache/crunch/lib/SecondarySort.html) API provides a set
of `sortAndApply` methods that can be used on input PTables of the form `PTable>`,
where `K` is the primary grouping key and `K2` is the secondary grouping key. The `sortAndApply`
method will perform the grouping and sorting and will then apply a given DoFn to process the
grouped and sorted values.
### Other Operations
Crunch provides implementations of a number of other common distributed processing patterns and
techniques throughout its library APIs.
#### Cartesian Products
Cartesian products between PCollections are a bit tricky in distributed processing; we usually want
one of the datasets to be small enough to fit into memory, and then do a pass over the larger data
set where we emit an element of the smaller data set along with each element from the larger set.
When this pattern isn't possible but we still need to take the cartesian product, we have some options,
but they're fairly expensive. Crunch's [Cartesian](apidocs/0.10.0/org/apache/crunch/lib/Cartesian.html) API
provides methods for a reduce-side full cross product between two PCollections (or PTables.) Note that
this is a pretty expensive operation, and you should go out of your way to avoid these kinds of processing
steps in your pipelines.
#### Coalescing
Many MapReduce jobs have the potential to generate a large number of small files that could be used more
effectively by clients if they were all merged together into a small number of large files. The
[Shard](apidocs/0.10.0/org/apache/crunch/lib/Shard.html) API provides a single method, `shard`, that allows
you to coalesce a given PCollection into a fixed number of partitions:
PCollection<Long> data = ...;
PCollection<Long> sharded = Shard.shard(data, 10);
This has the effect of running a no-op MapReduce job that shuffles the data into the given number of
partitions. This is often a useful step at the end of a long pipeline run.
#### Distinct
Crunch's [Distinct](apidocs/0.10.0/org/apache/crunch/lib/Distinct.html) API has a method, `distinct`, that
returns one copy of each unique element in a given PCollection:
PCollection<Long> data = ...;
PCollection<Long> distinct = Distinct.distinct(data);
The distinct method operates by maintaining a Set in each task that stores the elements it has seen
thus far from each of the partitions, and then periodically flushing the contents of this Set to disk. You
can control how often this flushing occurs in order to optimize runtime performance or control memory use
with another method in Distinct:
PCollection<Long> data = ...;
int flushEvery = 20000;
PCollection<Long> distinct = Distinct.distinct(data, flushEvery);
The default value of flushEvery is 50000, but you can test out the performance of different settings of this
value for your own pipelines. The optimal value will depend on some combination of the size of the objects (and
thus the amount of memory they consume) and the number of unique elements in the data.
#### Sampling
The [Sample](apidocs/0.10.0/org/apache/crunch/lib/Sample.html) API provides methods for two sorts of PCollection
sampling: random and reservoir.
Random sampling is where you include each record in the same with a fixed probability, and is probably what you're
used to when you think of sampling from a collection:
PCollection<Double> data = ...;
PCollection<Double> sample = Sample.sample(data, 0.05);
Here, we create the same by generating a uniform random number between 0 and 1 for each input record, and including
the record in the sample if the random value is less than 0.05. We expect that we will get roughly 5% of the input
data included in the sample, but we cannot know precisely how many elements will be in the sample.
In reservoir sampling, we use an algorithm to select an exact number of elements from the input data in a way that
each input has an equal probability of being selected- even if we don't know how many elements are in the input
collection! You can read more about how this works [here](http://en.wikipedia.org/wiki/Reservoir_sampling).
PCollection<Double> data = ...;
// Choose 500 elements from this input randomly.
PCollection<Double> sample = Sample.reservoirSample(data, 500);
There are also methods on the Sample API that work on PTables and allow you to control the seed used by the
random number generators. Note that all of the sampling algorithms Crunch provides, both random and reservoir,
only require a single pass over the data.
#### Set Operations
The [Set](apidocs/0.10.0/org/apache/crunch/lib/Set.html) API methods complement Crunch's built-in `union` methods and
provide support for finding the intersection, the difference, or the [comm](http://en.wikipedia.org/wiki/Comm) of two PCollections.
#### Splits
Sometimes, you want to write two different outputs from the same DoFn into different PCollections. An example of this would
be a pipeline in which you wanted to write good records to one file and bad or corrupted records to a different file for
further examination. The [Channels](apidocs/0.10.0/org/apache/crunch/lib/Channels.html) class provides a method that allows
you to split an input PCollection of Pairs into a Pair of PCollections:
PCollection<Pair<L, R>> in = ...;
Pair<PCollection<L>, PCollection<R>> split = Channels.split(in);
split.first().write(goodOutputs);
split.second().write(badOutputs);
### Retaining objects within DoFns
For reasons of efficiency, Hadoop MapReduce repeatedly passes the [same references as keys and values to Mappers and Reducers](https://issues.apache.org/jira/browse/HADOOP-2399) instead of passing in new objects for each call.
The state of the singleton key and value objects is updated between each call
to `Mapper.map()` and `Reducer.reduce()`, as well as updating it between each
call to `Iterator.next` while iterating over the Iterable within a Reducer.
The result of this optimization in MapReduce is that a reference to an object
received within a map or reduce call cannot be held on to past the scope of
that single method call invocation, as its value will change between
invocations of the method call. In some (but not all) situations, the
consequences of this optimization affect DoFns as well, meaning that you can't
simply retain a reference that is passed in to `DoFn.process` past the lifetime
of a method call.
A convenience method called `getDetachedValue` is specified in the `PType`
interface to get around this limitation. Implementations of this method
perform a deep copy of values of their configured type if needed, and return
the value that has been "detached" from the ownership of the MapReduce
framework.
In order to make use of the `getDetachedValue` method in a PType, you need to
have an initialized instance of the PType within the DoFn. Note that the
initialization of the PType should be performed in the `initialize()` method of
the DoFn.
An example of a DoFn that would make use of getDetachedValue to correctly emit
the maximum value encountered would be implemented as follows:
public class FindMax extends DoFn {
private PType ptype;
private T maxValue;
public FindMax(PType ptype) {
this.ptype = ptype;
}
public void initialize() {
this.ptype.initialize(getConfiguration());
}
public void process(T input, Emitter emitter) {
if (maxValue == null || maxValue.compareTo(input) > 0) {
// We need to call getDetachedValue here, otherwise the internal
// state of maxValue might change with each call to process()
// and we won't hold on to the max value
maxValue = ptype.getDetachedValue(input);
}
}
public void cleanup(Emitter emitter) {
if (maxValue != null) {
emitter.emit(maxValue);
}
}
}
## Crunch for HBase
Crunch is an excellent platform for creating pipelines that involve processing data from HBase tables. Because of Crunch's
flexible schemas for PCollections and PTables, you can write pipelines that operate directly on HBase API classes like
`Put`, `KeyValue`, and `Result`.
Be sure that the version of Crunch that you're using is compatible with the version of HBase that you are running. The 0.8.x
Crunch versions and earlier ones are developed against HBase 0.94.x, while version 0.10.0 and after are developed against
HBase 0.96. There were a small number of backwards-incompatible changes made between HBase 0.94 and 0.96 that are reflected
in the Crunch APIs for working with HBase. The most important of these is that in HBase 0.96, HBase's `Put`, `KeyValue`, and `Result`
classes no longer implement the Writable interface. To support working with these types in Crunch 0.10.0, we added the
[HBaseTypes](apidocs/0.10.0/org/apache/crunch/io/hbase/HBaseTypes.html) class that has factory methods for creating PTypes that serialize the HBase client classes to bytes so
that they can still be used as part of MapReduce pipelines.
Crunch supports working with HBase data in two ways. The [HBaseSourceTarget](apidocs/0.10.0/org/apache/crunch/io/hbase/HBaseSourceTarget.html) and [HBaseTarget](apidocs/0.10.0/org/apache/crunch/io/hbase/HBaseTarget.html) classes support reading and
writing data to HBase tables directly. The [HFileSource](apidocs/0.10.0/org/apache/crunch/io/hbase/HFileSource.html) and [HFileTarget](apidocs/0.10.0/org/apache/crunch/io/hbase/HFileTarget.html) classes support reading and writing data
to hfiles, which are the underlying file format for HBase. HFileSource and HFileTarget can be used to read and write data to
hfiles directly, which is much faster than going through the HBase APIs and can be used to perform efficient bulk loading of data
into HBase tables. See the utility methods in the [HFileUtils](apidocs/0.10.0/org/apache/crunch/io/hbase/HFileUtils.html) class for
more details on how to work with PCollections against hfiles.
## Managing Pipeline Execution
Crunch uses a lazy execution model. No jobs are run or outputs created until the user explicitly invokes one of the methods on the
Pipeline interface that controls job planning and execution. The simplest of these methods is the `PipelineResult run()` method,
which analyzes the current graph of PCollections and Target outputs and comes up with a plan to ensure that each of the outputs is
created and then executes it, returning only when the jobs are completed. The [PipelineResult](apidocs/0.10.0/org/apache/crunch/PipelineResult.html)
returned by the `run` method contains information about what was run, including the number of jobs that were executed during the
pipeline run and the values of the Hadoop Counters for each of those stages via the [StageResult](apidocs/0.10.0/org/apache/crunch/PipelineResult.StageResult.html) component classes.
The last method that should be called in _any_ Crunch pipeline run is the Pipeline interface's `PipelineResult done()` method. The done method will
ensure that any remaining outputs that have not yet been created are executed via the `run`, and it will clean up the temporary directories that
Crunch creates during runs to hold serialized job information and intermediate outputs.
Crunch also allows developers to execute finer-grained control over pipeline execution via Pipeline's `PipelineExecution runAsync()` method.
The `runAsync` method is a non-blocking version of the `run` method that returns a [PipelineExecution](apidocs/0.10.0/org/apache/crunch/PipelineExecution.html) instance that can be used to monitor the currently running Crunch pipeline. The PipelineExecution object is also useful for debugging
Crunch pipelines by visualizing the Crunch execution plan in DOT format via its `String getPlanDotFile()` method. PipelineExection implements
Guava's [ListenableFuture](https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained), so you can attach handlers that will be
called when your pipeline finishes executing.
Most of the job of the Crunch planner involves deciding where and when to cache intermediate outputs between different pipeline stages. If you
find that the Crunch planner isn't optimally deciding where to split two dependent jobs, you can control which PCollections are used as
split points in a pipeline via the `Iterable materialize()` and `PCollection cache()` methods available on the PCollection interface.
If the planner detects a materialized or cached PCollection along the path between two jobs, the planner will prefer the already cached
PCollection to its own choice. The implementation of materialize and cache vary slightly between the MapReduce-based and Spark-based
execution pipelines in a way that is explained in the subsequent section of the guide.
## The Different Pipeline Implementations (Properties and Configuration options)
This section adds some additional details about the implementation and configuration options available for each of
the different execution engines.
### MRPipeline
The [MRPipeline](apidocs/0.10.0/org/apache/crunch/impl/mr/MRPipeline.html) is the oldest implementation of the Pipeline interface and
compiles and executes the DAG of PCollections into a series of MapReduce jobs. MRPipeline has three constructors that are commonly
used:
1. `MRPipeline(Class> jarClass)` takes a class reference that is used to identify the jar file that contains the DoFns and
any associated classes that should be passed to the cluster to execute the job.
2. `MRPipeline(Class> jarClass, Configuration conf)` is like the class constructor, but allows you to declare a `Configuration`
instance to use as the basis for the job. This is a good constructor to use in general, especially when you are using Hadoop's
`Tool` interface and `Configured` base class to declare the main methods for running your pipeline.
3. `MRPipeline(Class> jarClass, String appName, Configuration conf)` allows you to declare a common prefix (given by the `appName`
argument) for all of the MapReduce jobs that will be executed as part of this data pipeline. That can make it easy to identify
your jobs on the JobTracker or ApplicationMaster.
There are a number of handy configuration parameters that can be used to adjust the behavior of MRPipeline that you should be
aware of:
Name |
Type |
Usage Notes |
crunch.tmp.dir |
string |
The base directory for Crunch to use when it writes temporary outputs for a job. Default is "/tmp". |
crunch.debug |
boolean |
Enables debug mode, which traps and logs any runtime exceptions and input data. Can also be enabled via enableDebug() on the Pipeline interface. False by default, because it introduces a fair amount of overhead. |
crunch.job.name.max.stack.length |
integer |
Controls the length of the name of the job that Crunch generates for each phase of the pipeline. Default is 60 chars. |
crunch.log.job.progress |
boolean |
If true, Crunch will print the "Map %P Reduce %P" data to stdout as the jobs run. False by default. |
crunch.disable.combine.file |
boolean |
By default, Crunch will use CombineFileInputFormat for subclasses of FileInputFormat. This can be disabled on a per-source basis or globally. |
crunch.combine.file.block.size |
integer |
The block size to use for the CombineFileInputFormat. Default is the dfs.block.size for the cluster. |
crunch.max.running.jobs |
integer |
Controls the maximum number of MapReduce jobs that will be executed simultaneously. Default is 5. |
crunch.max.poll.interval |
integer |
Controls the maximum poll interval of MapReduce jobs in milliseconds. Default is 1000 for local mode and 10000 otherwise. |
### SparkPipeline
The `SparkPipeline` is the newest implementation of the Pipeline interface, and was added in Crunch 0.10.0. It has two default constructors:
1. `SparkPipeline(String sparkConnection, String appName)` which takes a Spark connection string, which is of the form `local[numThreads]` for
local mode or `master:port` for a Spark cluster. This constructor will create its own `JavaSparkContext` instance to control the Spark pipeline
that it runs.
2. `SparkPipeline(JavaSparkContext context, String appName)` will use a given JavaSparkContext directly, instead of creating its own.
Note that the JavaSparkContext creates its own `Configuration` instance itself, and that there currently isn't a way to set it during pipeline
startup. We hope and expect that this will be corrected in a future version of Spark.
Crunch delegates much of the pipeline execution to Spark and does relatively little of the pipeline planning tasks, with a few crucial
exceptions:
1. *Multiple inputs*: Crunch does the work of abstracting the combination of input format and configuration parameters in a way that makes it easy to work with multiple inputs in a pipeline, even if they're of the same type and would have conflicting conf parameters (e.g., if you were trying to join two avro files w/different schemas, the schema conf parameters would typically conflict with each other.)
2. *Multiple outputs*: Spark doesn't have a concept of multiple outputs; when you write a data set to disk, the pipeline that creates that data set runs immediately. This means that you need to be a little bit clever about caching intermediate stages so you don't end up re-running a big long pipeline multiple times in order to write a couple of outputs. Crunch does that for you, along with the same output format and parameter wrapping you get for multiple inputs.
3. *Data serialization*: Spark uses Java serialization or Kryo, with some specialized handling for Writables. Kryo doesn't handle Avro records well, so Crunch's serialization turns those records into byte arrays for you so they don't break your pipelines.
4. *Checkpointing*: `Target.WriteMode` allows for checkpointing data across Spark pipeline runs. This is useful during active pipeline development, since most of the failures that occur when creating data pipelines are in user-defined functions that come across unexpected input.
Note that the `cache(CacheOptions options)` method on the PCollection interface exposes the same level of control over RDD caching that the Spark API
provides, in terms of memory vs. disk and serialized vs. deserialized data. Although these same methods exist for the MRPipleine implementation, the
only caching strategy that is applied is MapReduce's disk and serialization caching, the other options are ignored.
It is important that you call the `done()` method on the SparkPipeline at the end of your job, which will cleanup the JavaSparkContext. You may
get strange and unpredictable failures if you do not do this. As the newest Pipeline implementation, you should expect that SparkPipeline will
be a little rough around the edges and may not handle all of the use cases that MRPipeline can handle, although the Crunch community is
actively working to ensure complete compatibility between the two implementations.
### MemPipeline
The [MemPipeline](apidocs/0.10.0/org/apache/crunch/impl/mem/MemPipeline.html) implementation of Pipeline has a few interesting
properties. First, unlike MRPipeline, MemPipeline is a singleton; you don't create a MemPipeline, you just get a reference to it
via the static `MemPipeline.getInstance()` method. Second, all of the operations in the MemPipeline are executed completely in-memory,
there is no serialization of data to disk by default, and PType usage is fairly minimal. This has both benefits and drawbacks; on
the upside, MemPipeline runs are extremely fast and are a good way to test the internal logic of your DoFns and pipeline operations.
On the downside, MemPipeline will not exercise serialization code, so it's possible for a MemPipeline run to work fine while a
real cluster run using MRPipeline or SparkPipeline fails due to some data serialization issue. As a rule, you should always have
integration tests that run either MapReduce or Spark in local mode so that you can test for these issues.
You can add data to the PCollections support by MemPipeline in two primary ways. First, you can use Pipeline's `read(Source src)`
methods, just as you would for MRPipeline or SparkPipeline. MemPipeline requires that any input sources implement the ReadableSource
interface so that the data they contain can be read into memory. You can also take advantage of a couple of handy factory methods
on MemPipeline that can be used to create PCollections from Java Iterables:
PCollection<String> data = MemPipeline.collectionOf("a", "b", "c");
PCollection<String> typedData = MemPipeline.typedCollectionOf(Avros.strings(), "a", "b", "c");
List<Pair<String, Long>> list = ImmutableList.of(Pair.of("a", 1L), Pair.of("b", 2L));
PTable<String, Long> table = MemPipeline.tableOf(list);
PTable<String, Long> typedTable = MemPipeline.typedTableOf(
Writables.tableOf(Writables.strings(), Writables.longs()), list);
As you can see, you can create either typed or untyped collections, depending on whether or not you provide a PType to
be used with the PCollection you create. In general, providing a PType is a good idea, primarily because so many of the
Crunch API methods assume that PCollections have a valid and non-null PType available to work with.
On the output side, there is some limited support for writing the contents of an in-memory PCollection or PTable into
an Avro file, a Sequence file, or to a text file, but the support here isn't nearly as robust as the support on the
read side because Crunch does not an equivalent `WritableTarget` interface that matches the `ReadableSource` interface
on the read side. Often the best way to verify that the contents of your pipeline are correct is by using the
`materialize()` method to get a reference to the contents of the in-memory collection and then verify them directly,
without writing them out to disk.
## Unit Testing Pipelines
For production data pipelines, unit tests are an absolute must. The [MemPipeline](#mempipeline) implementation of the Pipeline
interface has several tools to help developers create effective unit tests, which will be detailed in this section.
### Unit Testing DoFns
Many of the DoFn implementations, such as `MapFn` and `FilterFn`, are very easy to test, since they accept a single input
and return a single output. For general purpose DoFns, we need an instance of the [Emitter](apidocs/0.10.0/org/apache/crunch/Emitter.html)
interface that we can pass to the DoFn's `process` method and then read in the values that are written by the function. Support
for this pattern is provided by the [InMemoryEmitter](apidocs/0.10.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html) class, which
has a `List getOutput()` method that can be used to read the values that were passed to the Emitter instance by a DoFn instance:
@Test
public void testToUpperCaseFn() {
InMemoryEmitter emitter = new InMemoryEmitter();
new ToUpperCaseFn().process("input", emitter);
assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
}
### Testing Complex DoFns and Pipelines
Many of the DoFns we write involve more complex processing that require that our DoFn be initialized and cleaned up, or that
define Counters that we use to track the inputs that we receive. In order to ensure that our DoFns are working properly across
their entire lifecycle, it's best to use the [MemPipeline](#mempipeline) implementation to create in-memory instances of
PCollections and PTables that contain a small amount of test data and apply our DoFns to those PCollections to test their
functionality. We can easily retrieve the contents of any in-memory PCollection by calling its `Iterable materialize()`
method, which will return immediately. We can also track the values of any Counters that were called as the DoFns were
executed against the test data by calling the static `getCounters()` method on the MemPipeline instance, and reset
those Counters between test runs by calling the static `clearCounters()` method:
public static class UpperCaseWithCounterFn extends DoFn {
@Override
public void process(String input, Emitter emitter) {
String upper = input.toUpperCase();
if (!upper.equals(input)) {
increment("UpperCase", "modified");
}
emitter.emit(upper);
}
}
@Before
public void setUp() throws Exception {
MemPipeline.clearCounters();
}
@Test
public void testToUpperCase_WithPipeline() {
PCollection inputStrings = MemPipeline.collectionOf("a", "B", "c");
PCollection upperCaseStrings = inputStrings.parallelDo(new UpperCaseWithCounterFn(), Writables.strings());
assertEquals(ImmutableList.of("A", "B", "C"), Lists.newArrayList(upperCaseStrings.materialize()));
assertEquals(2L, MemPipeline.getCounters().findCounter("UpperCase", "modified").getValue());
}
### Designing Testable Data Pipelines
In the same way that we try to [write testable code](http://misko.hevery.com/code-reviewers-guide/), we want to ensure that
our data pipelines are written in a way that makes them easy to test. In general, you should try to break up complex pipelines
into a number of function calls that perform a small set of operations on input PCollections and return one or more PCollections
as a result. This makes it easy to swap in different PCollection implementations for testing and production runs.
Let's look at an example that computes one iteration of the [PageRank](http://en.wikipedia.org/wiki/PageRank) algorithm that
is taken from one of Crunch's integration tests:
// Each entry in the PTable represents a URL and its associated data for PageRank computations.
public static PTable pageRank(PTable input, final float d) {
PTypeFamily ptf = input.getTypeFamily();
// Compute the outbound page rank from each of the input pages.
PTable outbound = input.parallelDo(new DoFn, Pair>() {
@Override
public void process(Pair input, Emitter> emitter) {
PageRankData prd = input.second();
for (String link : prd.urls) {
emitter.emit(Pair.of(link, prd.propagatedScore()));
}
}
}, ptf.tableOf(ptf.strings(), ptf.floats()));
// Update the PageRank for each URL.
return input.cogroup(outbound).mapValues(
new MapFn, Collection>, PageRankData>() {
@Override
public PageRankData map(Pair, Collection> input) {
PageRankData prd = Iterables.getOnlyElement(input.first());
Collection propagatedScores = input.second();
float sum = 0.0f;
for (Float s : propagatedScores) {
sum += s;
}
return prd.next(d + (1.0f - d) * sum);
}
}, input.getValueType());
}
By embedding our business logic inside of a static method that operates on PTables, we can easily unit test our PageRank
computations that combine custom DoFns with Crunch's built-in `cogroup` operation by using the [MemPipeline](#mempipeline)
implementation to create test data sets that we can easily verify by hand, and then this same logic can be executed on
a distributed data set using either the [MRPipeline](#mrpipeline) or [SparkPipeline](#sparkpipeline) implementations.
### Pipeline execution plan visualizations
Crunch provides tools to visualize the pipeline execution plan. The [PipelineExecution](apidocs/0.10.0/org/apache/crunch/PipelineExecution.html) `String getPlanDotFile()` method returns a DOT format visualization of the exaction plan. Furthermore if the output folder is set then Crunch will save the dotfile diagram on each pipeline execution:
Configuration conf =...;
String dotfileDir =...;
// Set DOT files out put folder path
DotfileUtills.setPipelineDotfileOutputDir(conf, dotfileDir);
Additional details of the Crunch execution plan can be exposed by enabling the dotfile debug mode like this:
// Requires the output folder to be set.
DotfileUtills.enableDebugDotfiles(conf);
This will produce (and save) 4 additional diagrams that visualize the internal stages of the pipeline execution plan. Such diagrams are the PCollection pineage, the pipeline base and split graphs and the run-time node (RTNode) representation.
(Note: The debug mode requires the output folder to be set. )