Database Reference
In-Depth Information
The program in Example 8-5 shows how to use MultipleOutputs to partition the
dataset by station.
Example 8-5. Partitioning whole dataset into files named by the station ID using Mul-
tipleOutputs
public class PartitionByStationUsingMultipleOutputs extends Configured
implements Tool {
static class StationMapper
extends Mapper < LongWritable , Text , Text , Text > {
private NcdcRecordParser parser = new NcdcRecordParser ();
@Override
protected void map ( LongWritable key , Text value , Context context )
throws IOException , InterruptedException {
parser . parse ( value );
context . write ( new Text ( parser . getStationId ()), value );
}
}
static class MultipleOutputsReducer
extends Reducer < Text , Text , NullWritable , Text > {
private MultipleOutputs < NullWritable , Text > multipleOutputs ;
@Override
protected void setup ( Context context )
throws IOException , InterruptedException {
multipleOutputs = new MultipleOutputs < NullWritable ,
Text >( context );
}
@Override
protected void reduce ( Text key , Iterable < Text > values , Context
context )
throws IOException , InterruptedException {
for ( Text value : values ) {
multipleOutputs . write ( NullWritable . get (), value ,
key . toString ());
}
}
@Override
protected void cleanup ( Context context )
throws IOException , InterruptedException {
Search WWH ::




Custom Search