Database Reference
In-Depth Information
extends
Reducer
<
Text
,
IntWritable
,
Text
,
IntWritable
> {
private
NcdcStationMetadata metadata
;
@Override
protected
void
setup
(
Context context
)
throws
IOException
,
InterruptedException
{
metadata
=
new
NcdcStationMetadata
();
metadata
.
initialize
(
new
File
(
"stations-fixed-width.txt"
));
}
@Override
protected
void
reduce
(
Text key
,
Iterable
<
IntWritable
>
values
,
Context context
)
throws
IOException
,
InterruptedException
{
String stationName
=
metadata
.
getStationName
(
key
.
toString
());
int
maxValue
=
Integer
.
MIN_VALUE
;
for
(
IntWritable value
:
values
) {
maxValue
=
Math
.
max
(
maxValue
,
value
.
get
());
}
context
.
write
(
new
Text
(
stationName
),
new
IntWritable
(
maxValue
));
}
}
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
Job job
=
JobBuilder
.
parseInputAndOutput
(
this
,
getConf
(),
args
);
if
(
job
==
null
) {
return
-
1
;
}
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
IntWritable
.
class
);
job
.
setMapperClass
(
StationTemperatureMapper
.
class
);
job
.
setCombinerClass
(
MaxTemperatureReducer
.
class
);
job
.
setReducerClass
(
MaxTemperatureReducerWithStationLookup
.
class
);
return
job
.
waitForCompletion
(
true
) ?
0
:
1
;
}
public static
void
main
(
String
[]
args
)
throws
Exception
{
int
exitCode
=
ToolRunner
.
run
(
new
MaxTemperatureByStationNameUsingDistributedCacheFile
(),
args
);