Database Reference
In-Depth Information
private
NcdcRecordParser parser
=
new
NcdcRecordParser
();
private
GenericRecord record
=
new
GenericData
.
Record
(
SCHEMA
);
@Override
protected
void
map
(
LongWritable key
,
Text value
,
Context context
)
throws
IOException
,
InterruptedException
{
parser
.
parse
(
value
.
toString
());
if
(
parser
.
isValidTemperature
()) {
record
.
put
(
"year"
,
parser
.
getYearInt
());
record
.
put
(
"temperature"
,
parser
.
getAirTemperature
());
record
.
put
(
"stationId"
,
parser
.
getStationId
());
context
.
write
(
new
AvroKey
<
Integer
>(
parser
.
getYearInt
()),
new
AvroValue
<
GenericRecord
>(
record
));
}
}
}
public static class
MaxTemperatureReducer
extends
Reducer
<
AvroKey
<
Integer
>,
AvroValue
<
GenericRecord
>,
AvroKey
<
GenericRecord
>,
NullWritable
> {
@Override
protected
void
reduce
(
AvroKey
<
Integer
>
key
,
Iterable
<
AvroValue
<
GenericRecord
>>
values
,
Context context
)
throws
IOException
,
InterruptedException
{
GenericRecord max
=
null
;
for
(
AvroValue
<
GenericRecord
>
value
:
values
) {
GenericRecord record
=
value
.
datum
();
if
(
max
==
null
||
(
Integer
)
record
.
get
(
"temperature"
) > (
Integer
)
max
.
get
(
"temperature"
)) {
max
=
newWeatherRecord
(
record
);
}
}
context
.
write
(
new
AvroKey
(
max
),
NullWritable
.
get
());
}
private
GenericRecord
newWeatherRecord
(
GenericRecord value
) {
GenericRecord record
=
new
GenericData
.
Record
(
SCHEMA
);
record
.
put
(
"year"
,
value
.
get
(
"year"
));
record
.
put
(
"temperature"
,
value
.
get
(
"temperature"
));
record
.
put
(
"stationId"
,
value
.
get
(
"stationId"
));
return
record
;
}
}