Database Reference
In-Depth Information
private NcdcRecordParser parser = new NcdcRecordParser ();
private GenericRecord record = new GenericData . Record ( SCHEMA );
@Override
protected void map ( LongWritable key , Text value , Context context )
throws IOException , InterruptedException {
parser . parse ( value . toString ());
if ( parser . isValidTemperature ()) {
record . put ( "year" , parser . getYearInt ());
record . put ( "temperature" , parser . getAirTemperature ());
record . put ( "stationId" , parser . getStationId ());
context . write ( new AvroKey < Integer >( parser . getYearInt ()),
new AvroValue < GenericRecord >( record ));
}
}
}
public static class MaxTemperatureReducer
extends Reducer < AvroKey < Integer >, AvroValue < GenericRecord >,
AvroKey < GenericRecord >, NullWritable > {
@Override
protected void reduce ( AvroKey < Integer > key ,
Iterable < AvroValue < GenericRecord >>
values , Context context ) throws IOException ,
InterruptedException {
GenericRecord max = null ;
for ( AvroValue < GenericRecord > value : values ) {
GenericRecord record = value . datum ();
if ( max == null ||
( Integer ) record . get ( "temperature" ) > ( Integer )
max . get ( "temperature" )) {
max = newWeatherRecord ( record );
}
}
context . write ( new AvroKey ( max ), NullWritable . get ());
}
private GenericRecord newWeatherRecord ( GenericRecord value ) {
GenericRecord record = new GenericData . Record ( SCHEMA );
record . put ( "year" , value . get ( "year" ));
record . put ( "temperature" , value . get ( "temperature" ));
record . put ( "stationId" , value . get ( "stationId" ));
return record ;
}
}
Search WWH ::




Custom Search