Database Reference
In-Depth Information
Here's another example of a join in Pig, in a script for calculating the maximum temperat-
ure for every station over a time period controlled by the input:
-- max_temp_station_name.pig
REGISTER pig
-examples.jar;
DEFINE
isGood com.hadoopbook.pig.IsGoodQuality();
stations =
LOAD
'input/ncdc/metadata/stations-fixed-width.txt'
USING
com.hadoopbook.pig.CutLoadFunc(
'1-6,8-12,14-42'
)
AS
(usaf:
chararray
, wban:
chararray
, name:
chararray
);
trimmed_stations =
FOREACH
stations
GENERATE
usaf, wban,
TRIM
(name);
records =
LOAD
'input/ncdc/all/191*'
USING
com.hadoopbook.pig.CutLoadFunc(
'5-10,11-15,88-92,93-93'
)
AS
(usaf:
chararray
, wban:
chararray
, temperature:
int
, quality:
int
);
filtered_records =
FILTER
records
BY
temperature !=
9999
AND
isGood
(quality);
grouped_records =
GROUP
filtered_records
BY
(usaf, wban)
PARALLEL
30
;
max_temp =
FOREACH
grouped_records
GENERATE FLATTEN
(
group
),
MAX
(filtered_records.temperature);
max_temp_named =
JOIN
max_temp
BY
(usaf, wban), trimmed_stations
BY
(usaf, wban)
PARALLEL
30
;
max_temp_result =
FOREACH
max_temp_named
GENERATE
$0, $1, $5, $2;
STORE
max_temp_result
INTO
'max_temp_by_station'
;
We use the cut UDF we developed earlier to load one relation holding the station IDs
(USAF and WBAN identifiers) and names, and one relation holding all the weather re-
cords, keyed by station ID. We group the filtered weather records by station ID and ag-
gregate by maximum temperature before joining with the stations. Finally, we project out
the fields we want in the final result: USAF, WBAN, station name, and maximum temper-
ature.
Here are a few results for the 1910s:
228020 99999 SORTAVALA 322
029110 99999 VAASA AIRPORT 300
040650 99999 GRIMSEY 378
This query could be made more efficient by using a fragment replicate join, as the station
metadata is small.