Database Reference
In-Depth Information
Functions
At the heart of any Crunch program are the functions (represented by
DoFn
) that trans-
form one
PCollection
into another. In this section, we examine some of the considera-
tions in writing your own custom functions.
Serialization of functions
When writing MapReduce programs, it is up to you to package the code for mappers and
reducers into a job JAR file so that Hadoop can make the user code available on the task
classpath (see
Packaging a Job
)
. Crunch takes a different approach. When a pipeline is ex-
ecuted, all the
DoFn
instances are serialized to a file that is distributed to task nodes using
Hadoop's distributed cache mechanism (described in
Distributed Cache
), and then deseri-
alized by the task itself so that the
DoFn
can be invoked.
The upshot for you, the user, is that you don't need to do any packaging work; instead,
you only need to make sure that your
DoFn
implementations are serializable according to
In most cases, no extra work is required, since the
DoFn
base class is declared as imple-
menting the
java.io.Serializable
interface. Thus, if your function is stateless,
there are no fields to serialize, and it will be serialized without issue.
There are a couple of problems to watch out for, however. One problem occurs if your
DoFn
is defined as an inner class (also called a nonstatic nested class), such as an an-
onymous class, in an outer class that doesn't implement
Serializable
:
public class
NonSerializableOuterClass
{
public
void
runPipeline
()
throws
IOException
{
// ...
PCollection
<
String
>
lines
=
pipeline
.
readTextFile
(
inputPath
);
PCollection
<
String
>
lower
=
lines
.
parallelDo
(
new
DoFn
<
String
,
String
>() {
@Override
public
void
process
(
String input
,
Emitter
<
String
>
emitter
) {
emitter
.
emit
(
input
.
toLowerCase
());
}
}
,
strings
());
// ...
}
}