Database Reference
In-Depth Information
throws
IOException
,
InterruptedException
{
parser
.
parse
(
value
);
context
.
write
(
new
TextPair
(
parser
.
getStationId
(),
"1"
),
value
);
}
}
The reducer knows that it will receive the station record first, so it extracts its name from
the value and writes it out as a part of every output record (
Example 9-11
).
Example 9-11. Reducer for joining tagged station records with tagged weather records
public class
JoinReducer
extends
Reducer
<
TextPair
,
Text
,
Text
,
Text
> {
@Override
protected
void
reduce
(
TextPair key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
Iterator
<
Text
>
iter
=
values
.
iterator
();
Text stationName
=
new
Text
(
iter
.
next
());
while
(
iter
.
hasNext
()) {
Text record
=
iter
.
next
();
Text outValue
=
new
Text
(
stationName
.
toString
() +
"\t"
+
record
.
toString
());
context
.
write
(
key
.
getFirst
(),
outValue
);
}
}
}
The code assumes that every station ID in the weather records has exactly one matching
record in the station dataset. If this were not the case, we would need to generalize the
code to put the tag into the value objects, by using another
TextPair
. The
reduce()
method would then be able to tell which entries were station names and detect (and
handle) missing or duplicate entries before processing the weather records.
WARNING
Because objects in the reducer's values iterator are reused (for efficiency purposes), it is vital that the
code makes a copy of the first
Text
object from the
values
iterator:
Text stationName
=
new
Text
(
iter
.
next
());
If the copy is not made, the
stationName
reference will refer to the value just read when it is turned
into a string, which is a bug.