Database Reference
In-Depth Information
NOTE
For this pipeline, Crunch's planner does a better job of optimizing the execution plan if
scores.materialize().iterator()
is called immediately after the
pageRank()
call. This
ensures that the
scores
table is explicitly materialized, so it is available for the next execution plan in
the next iteration of the loop. Without the call to
materialize()
, the program still produces the same
result, but it's less efficient: the planner may choose to materialize different intermediate results, and so
for the next iteration of the loop some of the computation must be re-executed to get the
scores
to pass
to the
pageRank()
call.
Checkpointing a Pipeline
In the previous section, we saw that Crunch will reuse any
PCollection
s that were
materialized in any previous runs of the same pipeline. However, if you create a new
pipeline instance, then it will not automatically share any materialized
PCollection
s
from other pipelines, even if the input source is the same. This can make developing a
pipeline rather time consuming, since even a small change to a computation toward the
end of the pipeline means Crunch will run the new pipeline from the beginning.
The solution is to
checkpoint
a
PCollection
to persistent storage (typically HDFS) so
that Crunch can start from the checkpoint in the new pipeline.
Consider the Crunch program for calculating a histogram of word counts for text files
back in
Example 18-3
.
We saw that the Crunch planner translates this pipeline into two
MapReduce jobs. If the program is run for a second time, then Crunch will run the two
MapReduce jobs again and overwrite the original output, since
WriteMode
is set to
OVERWRITE
.
If instead we checkpointed
inverseCounts
, a subsequent run would only launch one
MapReduce job (the one for computing
hist
, since it is entirely derived from
in-
verseCounts
). Checkpointing is simply a matter of writing a
PCollection
to a tar-
get with the
WriteMode
set to
CHECKPOINT
:
PCollection
<
String
>
lines
=
pipeline
.
readTextFile
(
inputPath
);
PTable
<
String
,
Long
>
counts
=
lines
.
count
();
PTable
<
Long
,
String
>
inverseCounts
=
counts
.
parallelDo
(
new
InversePairFn
<
String
,
Long
>(),
tableOf
(
longs
(),
strings
()));
inverseCounts
.
write
(
To
.
sequenceFile
(
checkpointPath
),
Target
.
WriteMode
.
CHECKPOINT
);
PTable
<
Long
,
Integer
>
hist
=
inverseCounts
.
groupByKey
()
.
mapValues
(
new
CountValuesFn
<
String
>(),
ints
());