Database Reference
In-Depth Information
throws IOException , InterruptedException {
parser . parse ( value );
context . write ( new TextPair ( parser . getStationId (), "1" ), value );
}
}
The reducer knows that it will receive the station record first, so it extracts its name from
the value and writes it out as a part of every output record ( Example 9-11 ).
Example 9-11. Reducer for joining tagged station records with tagged weather records
public class JoinReducer extends Reducer < TextPair , Text , Text , Text > {
@Override
protected void reduce ( TextPair key , Iterable < Text > values , Context
context )
throws IOException , InterruptedException {
Iterator < Text > iter = values . iterator ();
Text stationName = new Text ( iter . next ());
while ( iter . hasNext ()) {
Text record = iter . next ();
Text outValue = new Text ( stationName . toString () + "\t" +
record . toString ());
context . write ( key . getFirst (), outValue );
}
}
}
The code assumes that every station ID in the weather records has exactly one matching
record in the station dataset. If this were not the case, we would need to generalize the
code to put the tag into the value objects, by using another TextPair . The reduce()
method would then be able to tell which entries were station names and detect (and
handle) missing or duplicate entries before processing the weather records.
WARNING
Because objects in the reducer's values iterator are reused (for efficiency purposes), it is vital that the
code makes a copy of the first Text object from the values iterator:
Text stationName = new Text ( iter . next ());
If the copy is not made, the stationName reference will refer to the value just read when it is turned
into a string, which is a bug.
Search WWH ::




Custom Search