Database Reference
In-Depth Information
}
After the customary checking of command-line arguments, the program starts by con-
structing a Crunch
Pipeline
object, which represents the computation that we want to
run. As the name suggests, a pipeline can have multiple stages; pipelines with multiple in-
puts and outputs, branches, and iteration are all possible, although in this example we start
with a single-stage pipeline. We're going to use MapReduce to run the pipeline, so we
create an
MRPipeline
, but we could have chosen to use a
MemPipeline
for running
the pipeline in memory for testing purposes, or a
SparkPipeline
to run the same com-
putation using Spark.
A pipeline receives data from one or more input sources, and in this example the source is
a single text file whose name is specified by the first command-line argument,
args[0]
.
The
Pipeline
class has a convenience method,
readTextFile()
, to convert a text
file into a
PCollection
of
String
objects, where each
String
is a line from the
text file.
PCollection<S>
is the most fundamental data type in Crunch, and represents
an immutable, unordered, distributed collection of elements of type
S
. You can think of
PCollection<S>
as an unmaterialized analog of
java.util.Collection
— un-
materialized since its elements are not read into memory. In this example, the input is a
distributed collection of the lines of a text file, and is represented by
PCollec-
tion<String>
.
A Crunch computation operates on a
PCollection
, and produces a new
PCollec-
tion
. The first thing we need to do is parse each line of the input file, and filter out any
bad records. We do this by using the
parallelDo()
method on
PCollection
,
which applies a function to every element in the
PCollection
and returns a new
PCollection
. The method signature looks like this:
<
T
>
PCollection
<
T
>
parallelDo
(
DoFn
<
S
,
T
>
doFn
,
PType
<
T
>
type
);
The idea is that we write a
DoFn
implementation that transforms an instance of type
S
in-
to one or more instances of type
T
, and Crunch will apply the function to every element in
the
PCollection
. It should be clear that the operation can be performed in parallel in
the map task of a MapReduce job. The second argument to the
parallelDo()
method
is a
PType<T>
object, which gives Crunch information about both the Java type used for
T
and how to serialize that type.
We are actually going to use an overloaded version of
parallelDo()
that creates an
extension of
PCollection
called
PTable<K, V>
, which is a distributed
multi-map
of key-value pairs. (A multi-map is a map that can have duplicate key-value pairs.) This is