Database Reference
In-Depth Information
that a PTable<K, V> is a PCollection<Pair<K, V>> ). However, there are op-
portunities to limit the use of Pair objects in many cases, which will make your code
more readable. For example, use PCollection 's by() method in favor of paral-
lelDo() when creating a table where the values are the same as the ones in the PCol-
lection (as discussed in parallelDo() ), or use PGroupedTable 's combineVal-
ues() with an Aggregator in preference to a CombineFn (see combineValues() ).
The fastest path to using records in a Crunch pipeline is to define a Java class that has
fields that Avro Reflect can serialize and a no-arg constructor, like this WeatherRecord
class:
public class WeatherRecord {
private int year ;
private int temperature ;
private String stationId ;
public WeatherRecord () {
}
public WeatherRecord ( int year , int temperature , String stationId ) {
this . year = year ;
this . temperature = temperature ;
this . stationId = stationId ;
}
// ... getters elided
}
From there, it's straightforward to generate a PCollection<WeatherRecord> from
a PCollection<String> , using parallelDo() to parse each line into a
WeatherRecord object:
PCollection < String > lines = pipeline . read ( From . textFile ( inputPath ));
PCollection < WeatherRecord > records = lines . parallelDo (
new DoFn < String , WeatherRecord >() {
NcdcRecordParser parser = new NcdcRecordParser ();
@Override
public void process ( String input , Emitter < WeatherRecord > emitter ) {
parser . parse ( input );
if ( parser . isValidTemperature ()) {
emitter . emit ( new WeatherRecord ( parser . getYearInt (),
parser . getAirTemperature (), parser . getStationId ()));
}
}
}, Avros . records ( WeatherRecord . class ));
Search WWH ::




Custom Search