Database Reference
In-Depth Information
so we can represent the year as the key and the temperature as the value, which will en-
able us to do grouping and aggregation later in the pipeline. The method signature is:
<
K
,
V
>
PTable
<
K
,
V
>
parallelDo
(
DoFn
<
S
,
Pair
<
K
,
V
>>
doFn
,
PTableType
<
K
,
V
>
type
);
In this example, the
DoFn
parses a line of input and emits a year-temperature pair:
static
DoFn
<
String
,
Pair
<
String
,
Integer
>>
toYearTempPairsFn
() {
return new
DoFn
<
String
,
Pair
<
String
,
Integer
>>() {
NcdcRecordParser parser
=
new
NcdcRecordParser
();
@Override
public
void
process
(
String input
,
Emitter
<
Pair
<
String
,
Integer
>>
emitter
) {
parser
.
parse
(
input
);
if
(
parser
.
isValidTemperature
()) {
emitter
.
emit
(
Pair
.
of
(
parser
.
getYear
(),
parser
.
getAirTemperature
()));
}
}
};
}
After applying the function we get a table of year-temperature pairs:
PTable
<
String
,
Integer
>
yearTemperatures
=
records
.
parallelDo
(
toYearTempPairsFn
(),
tableOf
(
strings
(),
ints
()));
The second argument to
parallelDo()
is a
PTableType<K, V>
instance, which is
constructed using static methods on Crunch's
Writables
class (since we have chosen to
use Hadoop Writable serialization for any intermediate data that Crunch will write). The
tableOf()
method creates a
PTableType
with the given key and value types. The
strings()
method declares that keys are represented by Java
String
objects in
memory, and serialized as Hadoop
Text
. The values are Java
int
types and are serial-
ized as Hadoop
IntWritable
s.
At this point, we have a more structured representation of the data, but the number of re-
cords is still the same since every line in the input file corresponds to an entry in the
yearTemperatures
table. To calculate the maximum temperature reading for each
year in the dataset, we need to group the table entries by year, then find the maximum
temperature value for each year. Fortunately, Crunch provides exactly these operations as
a part of
PTable
's API. The
groupByKey()
method performs a MapReduce shuffle to
group entries by key and returns the third type of
PCollection
, called