Database Reference
In-Depth Information
job . getConfiguration (). setBoolean (
Job . MAPREDUCE_JOB_USER_CLASSPATH_FIRST , true );
FileInputFormat . addInputPath ( job , new Path ( input ));
FileOutputFormat . setOutputPath ( job , new Path ( output ));
AvroJob . setDataModelClass ( job , GenericData . class );
Schema schema = new Schema . Parser (). parse ( new File ( schemaFile ));
AvroJob . setInputKeySchema ( job , schema );
AvroJob . setMapOutputKeySchema ( job , schema );
AvroJob . setMapOutputValueSchema ( job , schema );
AvroJob . setOutputKeySchema ( job , schema );
job . setInputFormatClass ( AvroKeyInputFormat . class );
job . setOutputFormatClass ( AvroKeyOutputFormat . class );
job . setOutputKeyClass ( AvroKey . class );
job . setOutputValueClass ( NullWritable . class );
job . setMapperClass ( SortMapper . class );
job . setReducerClass ( SortReducer . class );
return job . waitForCompletion ( true ) ? 0 : 1 ;
}
public static void main ( String [] args ) throws Exception {
int exitCode = ToolRunner . run ( new AvroSort (), args );
System . exit ( exitCode );
}
}
This program (which uses the Generic Avro mapping and hence does not require any code
generation) can sort Avro records of any type, represented in Java by the generic type
parameter K . We choose a value that is the same as the key, so that when the values are
grouped by key we can emit all of the values in the case that more than one of them share
the same key (according to the sorting function). This means we don't lose any re-
cords. [ 85 ] The mapper simply emits the input key wrapped in an AvroKey and an Av-
roValue . The reducer acts as an identity, passing the values through as output keys,
which will get written to an Avro datafile.
The sorting happens in the MapReduce shuffle, and the sort function is determined by the
Avro schema that is passed to the program. Let's use the program to sort the pairs.avro
Search WWH ::




Custom Search