Database Reference
In-Depth Information
public static
<
K
,
V
>
void
writePartitionFile
(
Job job
,
Sampler
<
K
,
V
>
sampler
)
throws
IOException
,
ClassNotFoundException
,
InterruptedException
The sequence file is used by
TotalOrderPartitioner
to create partitions for the
sort job.
Example 9-5
puts it all together.
Example 9-5. A MapReduce program for sorting a SequenceFile with IntWritable keys us-
ing the TotalOrderPartitioner to globally sort the data
public class
SortByTemperatureUsingTotalOrderPartitioner
extends
Configured
implements
Tool
{
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
Job job
=
JobBuilder
.
parseInputAndOutput
(
this
,
getConf
(),
args
);
if
(
job
==
null
) {
return
-
1
;
}
job
.
setInputFormatClass
(
SequenceFileInputFormat
.
class
);
job
.
setOutputKeyClass
(
IntWritable
.
class
);
job
.
setOutputFormatClass
(
SequenceFileOutputFormat
.
class
);
SequenceFileOutputFormat
.
setCompressOutput
(
job
,
true
);
SequenceFileOutputFormat
.
setOutputCompressorClass
(
job
,
GzipCodec
.
class
);
SequenceFileOutputFormat
.
setOutputCompressionType
(
job
,
CompressionType
.
BLOCK
);
job
.
setPartitionerClass
(
TotalOrderPartitioner
.
class
);
InputSampler
.
Sampler
<
IntWritable
,
Text
>
sampler
=
new
InputSampler
.
RandomSampler
<
IntWritable
,
Text
>(
0.1
,
10000
,
10
);
InputSampler
.
writePartitionFile
(
job
,
sampler
);
// Add to DistributedCache
Configuration conf
=
job
.
getConfiguration
();
String partitionFile
=
TotalOrderPartitioner
.
getPartitionFile
(
conf
);
URI partitionUri
=
new
URI
(
partitionFile
);
job
.
addCacheFile
(
partitionUri
);
return
job
.
waitForCompletion
(
true
) ?
0
:
1
;
}