Database Reference
In-Depth Information
Tying the job together is the driver class, shown in
Example 9-12
.
The essential point here
is that we partition and group on the first part of the key, the station ID, which we do with
a custom
Partitioner
(
KeyPartitioner
) and a custom group comparator,
FirstComparator
(from
TextPair
).
Example 9-12. Application to join weather records with station names
public class
JoinRecordWithStationName
extends
Configured
implements
Tool
{
public static class
KeyPartitioner
extends
Partitioner
<
TextPair
,
Text
> {
@Override
public
int
getPartition
(
TextPair key
,
Text value
,
int
numPartitions
) {
return
(
key
.
getFirst
().
hashCode
()
&
Integer
.
MAX_VALUE
) %
numPartitions
;
}
}
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
if
(
args
.
length
!=
3
) {
JobBuilder
.
printUsage
(
this
,
"<ncdc input> <station input>
<output>"
);
return
-
1
;
}
Job job
=
new
Job
(
getConf
(),
"Join weather records with station
names"
);
job
.
setJarByClass
(
getClass
());
Path ncdcInputPath
=
new
Path
(
args
[
0
]);
Path stationInputPath
=
new
Path
(
args
[
1
]);
Path outputPath
=
new
Path
(
args
[
2
]);
MultipleInputs
.
addInputPath
(
job
,
ncdcInputPath
,
TextInputFormat
.
class
,
JoinRecordMapper
.
class
);
MultipleInputs
.
addInputPath
(
job
,
stationInputPath
,
TextInputFormat
.
class
,
JoinStationMapper
.
class
);
FileOutputFormat
.
setOutputPath
(
job
,
outputPath
);
job
.
setPartitionerClass
(
KeyPartitioner
.
class
);
job
.
setGroupingComparatorClass
(
TextPair
.
FirstComparator
.
class
);
job
.
setMapOutputKeyClass
(
TextPair
.
class
);