Database Reference
In-Depth Information
extends Reducer < Text , IntWritable , Text , IntWritable > {
private NcdcStationMetadata metadata ;
@Override
protected void setup ( Context context )
throws IOException , InterruptedException {
metadata = new NcdcStationMetadata ();
metadata . initialize ( new File ( "stations-fixed-width.txt" ));
}
@Override
protected void reduce ( Text key , Iterable < IntWritable > values ,
Context context ) throws IOException , InterruptedException {
String stationName = metadata . getStationName ( key . toString ());
int maxValue = Integer . MIN_VALUE ;
for ( IntWritable value : values ) {
maxValue = Math . max ( maxValue , value . get ());
}
context . write ( new Text ( stationName ), new IntWritable ( maxValue ));
}
}
@Override
public int run ( String [] args ) throws Exception {
Job job = JobBuilder . parseInputAndOutput ( this , getConf (), args );
if ( job == null ) {
return - 1 ;
}
job . setOutputKeyClass ( Text . class );
job . setOutputValueClass ( IntWritable . class );
job . setMapperClass ( StationTemperatureMapper . class );
job . setCombinerClass ( MaxTemperatureReducer . class );
job . setReducerClass ( MaxTemperatureReducerWithStationLookup . class );
return job . waitForCompletion ( true ) ? 0 : 1 ;
}
public static void main ( String [] args ) throws Exception {
int exitCode = ToolRunner . run (
new MaxTemperatureByStationNameUsingDistributedCacheFile (),
args );
Search WWH ::




Custom Search