Database Reference
In-Depth Information
Here's another example of a join in Pig, in a script for calculating the maximum temperat-
ure for every station over a time period controlled by the input:
-- max_temp_station_name.pig
REGISTER pig -examples.jar;
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
stations = LOAD 'input/ncdc/metadata/stations-fixed-width.txt'
USING com.hadoopbook.pig.CutLoadFunc( '1-6,8-12,14-42' )
AS (usaf: chararray , wban: chararray , name: chararray );
trimmed_stations = FOREACH stations GENERATE usaf, wban,
TRIM (name);
records = LOAD 'input/ncdc/all/191*'
USING com.hadoopbook.pig.CutLoadFunc( '5-10,11-15,88-92,93-93' )
AS (usaf: chararray , wban: chararray , temperature: int , quality: int );
filtered_records = FILTER records BY temperature != 9999 AND
isGood (quality);
grouped_records = GROUP filtered_records BY (usaf, wban) PARALLEL 30 ;
max_temp = FOREACH grouped_records GENERATE FLATTEN ( group ),
MAX (filtered_records.temperature);
max_temp_named = JOIN max_temp BY (usaf, wban), trimmed_stations BY
(usaf, wban)
PARALLEL 30 ;
max_temp_result = FOREACH max_temp_named GENERATE $0, $1, $5, $2;
STORE max_temp_result INTO 'max_temp_by_station' ;
We use the cut UDF we developed earlier to load one relation holding the station IDs
(USAF and WBAN identifiers) and names, and one relation holding all the weather re-
cords, keyed by station ID. We group the filtered weather records by station ID and ag-
gregate by maximum temperature before joining with the stations. Finally, we project out
the fields we want in the final result: USAF, WBAN, station name, and maximum temper-
ature.
Here are a few results for the 1910s:
228020 99999 SORTAVALA 322
029110 99999 VAASA AIRPORT 300
040650 99999 GRIMSEY 378
This query could be made more efficient by using a fragment replicate join, as the station
metadata is small.
Search WWH ::




Custom Search