Database Reference
In-Depth Information
Tying the job together is the driver class, shown in Example 9-12 . The essential point here
is that we partition and group on the first part of the key, the station ID, which we do with
a custom Partitioner ( KeyPartitioner ) and a custom group comparator,
FirstComparator (from TextPair ).
Example 9-12. Application to join weather records with station names
public class JoinRecordWithStationName extends Configured implements
Tool {
public static class KeyPartitioner extends Partitioner < TextPair ,
Text > {
@Override
public int getPartition ( TextPair key , Text value , int
numPartitions ) {
return ( key . getFirst (). hashCode () & Integer . MAX_VALUE ) %
numPartitions ;
}
}
@Override
public int run ( String [] args ) throws Exception {
if ( args . length != 3 ) {
JobBuilder . printUsage ( this , "<ncdc input> <station input>
<output>" );
return - 1 ;
}
Job job = new Job ( getConf (), "Join weather records with station
names" );
job . setJarByClass ( getClass ());
Path ncdcInputPath = new Path ( args [ 0 ]);
Path stationInputPath = new Path ( args [ 1 ]);
Path outputPath = new Path ( args [ 2 ]);
MultipleInputs . addInputPath ( job , ncdcInputPath ,
TextInputFormat . class , JoinRecordMapper . class );
MultipleInputs . addInputPath ( job , stationInputPath ,
TextInputFormat . class , JoinStationMapper . class );
FileOutputFormat . setOutputPath ( job , outputPath );
job . setPartitionerClass ( KeyPartitioner . class );
job . setGroupingComparatorClass ( TextPair . FirstComparator . class );
job . setMapOutputKeyClass ( TextPair . class );
Search WWH ::




Custom Search