Database Reference
In-Depth Information
public static < K , V > void writePartitionFile ( Job job , Sampler < K , V >
sampler )
throws IOException , ClassNotFoundException , InterruptedException
The sequence file is used by TotalOrderPartitioner to create partitions for the
sort job. Example 9-5 puts it all together.
Example 9-5. A MapReduce program for sorting a SequenceFile with IntWritable keys us-
ing the TotalOrderPartitioner to globally sort the data
public class SortByTemperatureUsingTotalOrderPartitioner extends
Configured
implements Tool {
@Override
public int run ( String [] args ) throws Exception {
Job job = JobBuilder . parseInputAndOutput ( this , getConf (), args );
if ( job == null ) {
return - 1 ;
}
job . setInputFormatClass ( SequenceFileInputFormat . class );
job . setOutputKeyClass ( IntWritable . class );
job . setOutputFormatClass ( SequenceFileOutputFormat . class );
SequenceFileOutputFormat . setCompressOutput ( job , true );
SequenceFileOutputFormat . setOutputCompressorClass ( job ,
GzipCodec . class );
SequenceFileOutputFormat . setOutputCompressionType ( job ,
CompressionType . BLOCK );
job . setPartitionerClass ( TotalOrderPartitioner . class );
InputSampler . Sampler < IntWritable , Text > sampler =
new InputSampler . RandomSampler < IntWritable , Text >( 0.1 , 10000 ,
10 );
InputSampler . writePartitionFile ( job , sampler );
// Add to DistributedCache
Configuration conf = job . getConfiguration ();
String partitionFile = TotalOrderPartitioner . getPartitionFile ( conf );
URI partitionUri = new URI ( partitionFile );
job . addCacheFile ( partitionUri );
return job . waitForCompletion ( true ) ? 0 : 1 ;
}
Search WWH ::




Custom Search