Database Reference
In-Depth Information
NOTE
For this pipeline, Crunch's planner does a better job of optimizing the execution plan if
scores.materialize().iterator() is called immediately after the pageRank() call. This
ensures that the scores table is explicitly materialized, so it is available for the next execution plan in
the next iteration of the loop. Without the call to materialize() , the program still produces the same
result, but it's less efficient: the planner may choose to materialize different intermediate results, and so
for the next iteration of the loop some of the computation must be re-executed to get the scores to pass
to the pageRank() call.
Checkpointing a Pipeline
In the previous section, we saw that Crunch will reuse any PCollection s that were
materialized in any previous runs of the same pipeline. However, if you create a new
pipeline instance, then it will not automatically share any materialized PCollection s
from other pipelines, even if the input source is the same. This can make developing a
pipeline rather time consuming, since even a small change to a computation toward the
end of the pipeline means Crunch will run the new pipeline from the beginning.
The solution is to checkpoint a PCollection to persistent storage (typically HDFS) so
that Crunch can start from the checkpoint in the new pipeline.
Consider the Crunch program for calculating a histogram of word counts for text files
back in Example 18-3 . We saw that the Crunch planner translates this pipeline into two
MapReduce jobs. If the program is run for a second time, then Crunch will run the two
MapReduce jobs again and overwrite the original output, since WriteMode is set to
OVERWRITE .
If instead we checkpointed inverseCounts , a subsequent run would only launch one
MapReduce job (the one for computing hist , since it is entirely derived from in-
verseCounts ). Checkpointing is simply a matter of writing a PCollection to a tar-
get with the WriteMode set to CHECKPOINT :
PCollection < String > lines = pipeline . readTextFile ( inputPath );
PTable < String , Long > counts = lines . count ();
PTable < Long , String > inverseCounts = counts . parallelDo (
new InversePairFn < String , Long >(), tableOf ( longs (), strings ()));
inverseCounts . write ( To . sequenceFile ( checkpointPath ),
Target . WriteMode . CHECKPOINT );
PTable < Long , Integer > hist = inverseCounts
. groupByKey ()
. mapValues ( new CountValuesFn < String >(), ints ());
Search WWH ::




Custom Search