Database Reference
In-Depth Information
Functions
At the heart of any Crunch program are the functions (represented by DoFn ) that trans-
form one PCollection into another. In this section, we examine some of the considera-
tions in writing your own custom functions.
Serialization of functions
When writing MapReduce programs, it is up to you to package the code for mappers and
reducers into a job JAR file so that Hadoop can make the user code available on the task
classpath (see Packaging a Job ) . Crunch takes a different approach. When a pipeline is ex-
ecuted, all the DoFn instances are serialized to a file that is distributed to task nodes using
Hadoop's distributed cache mechanism (described in Distributed Cache ), and then deseri-
alized by the task itself so that the DoFn can be invoked.
The upshot for you, the user, is that you don't need to do any packaging work; instead,
you only need to make sure that your DoFn implementations are serializable according to
the standard Java serialization mechanism. [ 121 ]
In most cases, no extra work is required, since the DoFn base class is declared as imple-
menting the java.io.Serializable interface. Thus, if your function is stateless,
there are no fields to serialize, and it will be serialized without issue.
There are a couple of problems to watch out for, however. One problem occurs if your
DoFn is defined as an inner class (also called a nonstatic nested class), such as an an-
onymous class, in an outer class that doesn't implement Serializable :
public class NonSerializableOuterClass {
public void runPipeline () throws IOException {
// ...
PCollection < String > lines = pipeline . readTextFile ( inputPath );
PCollection < String > lower = lines . parallelDo ( new DoFn < String ,
String >() {
@Override
public void process ( String input , Emitter < String > emitter ) {
emitter . emit ( input . toLowerCase ());
}
} , strings ());
// ...
}
}
Search WWH ::




Custom Search