Database Reference
In-Depth Information
for
line
in
sys
.
stdin
:
val
=
line
.
strip
()
(
year
,
temp
,
q
) = (
val
[
15
:
19
],
int
(
val
[
87
:
92
]),
val
[
92
:
93
])
if
temp
==
9999
:
sys
.
stderr
.
write
(
"reporter:counter:Temperature,Missing,1
\n
"
)
elif
re
.
match
(
"[01459]"
,
q
):
print
"
%s
\t
%s
"
% (
year
,
temp
)
However, we don't want to partition by the entire key, so we use
KeyFieldBasedPar-
titioner
, which allows us to partition by a part of the key. The specification
mapre-
duce.partition.keypartitioner.options
configures the partitioner. The
value
-k1,1
instructs the partitioner to use only the first field of the key, where fields are
assumed to be separated by a string defined by the
mapre-
duce.map.output.key.field.separator
property (a tab character by default).
Next, we want a comparator that sorts the year field in ascending order and the temperat-
ure field in descending order, so that the reduce function can simply return the first record
in each group. Hadoop provides
KeyFieldBasedComparator
, which is ideal for this
purpose. The comparison order is defined by a specification that is like the one used for
GNU
sort
. It is set using the
mapreduce.partition.keycomparator.options
property. The value
-k1n -k2nr
used in this example means “sort by the first field in
numerical order, then by the second field in reverse numerical order.” Like its partitioner
cousin,
KeyFieldBasedPartitioner
, it uses the map output key separator to split a
key into fields.
In the Java version, we had to set the grouping comparator; however, in Streaming, groups
are not demarcated in any way, so in the reduce function we have to detect the group
boundaries ourselves by looking for when the year changes (
Example 9-8
)
.
Example 9-8. Reduce function for secondary sort in Python
#!/usr/bin/env python
import
sys
last_group
=
None
for
line
in
sys
.
stdin
:
val
=
line
.
strip
()
(
year
,
temp
) =
val
.
split
(
"
\t
"
)
group
=
year
if
last_group
!=
group
:
print
val
last_group
=
group