Database Reference
In-Depth Information
dataset by station.
Example 8-5. Partitioning whole dataset into files named by the station ID using Mul-
tipleOutputs
public class
PartitionByStationUsingMultipleOutputs
extends
Configured
implements
Tool
{
static class
StationMapper
extends
Mapper
<
LongWritable
,
Text
,
Text
,
Text
> {
private
NcdcRecordParser parser
=
new
NcdcRecordParser
();
@Override
protected
void
map
(
LongWritable key
,
Text value
,
Context context
)
throws
IOException
,
InterruptedException
{
parser
.
parse
(
value
);
context
.
write
(
new
Text
(
parser
.
getStationId
()),
value
);
}
}
static class
MultipleOutputsReducer
extends
Reducer
<
Text
,
Text
,
NullWritable
,
Text
> {
private
MultipleOutputs
<
NullWritable
,
Text
>
multipleOutputs
;
@Override
protected
void
setup
(
Context context
)
throws
IOException
,
InterruptedException
{
multipleOutputs
=
new
MultipleOutputs
<
NullWritable
,
Text
>(
context
);
}
@Override
protected
void
reduce
(
Text key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
for
(
Text value
:
values
) {
multipleOutputs
.
write
(
NullWritable
.
get
(),
value
,
key
.
toString
());
}
}
@Override
protected
void
cleanup
(
Context context
)
throws
IOException
,
InterruptedException
{