Database Reference
In-Depth Information
PGroupedTable<K, V>
, which has a
combineValues()
method for performing
aggregation of all the values for a key, just like a MapReduce reducer:
PTable
<
String
,
Integer
>
maxTemps
=
yearTemperatures
.
groupByKey
()
.
combineValues
(
Aggregators
.
MAX_INTS
());
The
combineValues()
method accepts an instance of a Crunch
Aggregator
, a
simple interface for expressing any kind of aggregation of a stream of values, and here we
can take advantage of a built-in aggregator from the
Aggregators
class called
MAX_INTS
that finds the maximum value from a set of integers.
The final step in the pipeline is writing the
maxTemps
table to a file by calling
write()
with a text file target object constructed using the
To
static factory. Crunch actually uses
Hadoop's
TextOutputFormat
for this operation, which means that the key and value
in each line of output are separated by a tab:
maxTemps
.
write
(
To
.
textFile
(
args
[
1
]));
The program so far has only been concerned with pipeline construction. To execute a
pipeline, we have to call the
done()
method, at which point the program blocks until the
pipeline completes. Crunch returns a
PipelineResult
object that encapsulates vari-
ous statistics about the different jobs that were run in the pipeline, as well as whether the
pipeline succeeded or not. We use the latter information to set the program's exit code ap-
propriately.
When we run the program on the sample dataset, we get the following result:
%
hadoop jar crunch-examples.jar crunch.MaxTemperatureCrunch \
input/ncdc/sample.txt output
%
cat output/part-r-00000
1949 111
1950 22