Database Reference
In-Depth Information
that a
PTable<K, V>
is a
PCollection<Pair<K, V>>
). However, there are op-
portunities to limit the use of
Pair
objects in many cases, which will make your code
more readable. For example, use
PCollection
's
by()
method in favor of
paral-
lelDo()
when creating a table where the values are the same as the ones in the
PCol-
The fastest path to using records in a Crunch pipeline is to define a Java class that has
fields that Avro Reflect can serialize and a no-arg constructor, like this
WeatherRecord
class:
public class
WeatherRecord
{
private
int
year
;
private
int
temperature
;
private
String stationId
;
public
WeatherRecord
() {
}
public
WeatherRecord
(
int
year
,
int
temperature
,
String stationId
) {
this
.
year
=
year
;
this
.
temperature
=
temperature
;
this
.
stationId
=
stationId
;
}
// ... getters elided
}
From there, it's straightforward to generate a
PCollection<WeatherRecord>
from
a
PCollection<String>
, using
parallelDo()
to parse each line into a
WeatherRecord
object:
PCollection
<
String
>
lines
=
pipeline
.
read
(
From
.
textFile
(
inputPath
));
PCollection
<
WeatherRecord
>
records
=
lines
.
parallelDo
(
new
DoFn
<
String
,
WeatherRecord
>() {
NcdcRecordParser parser
=
new
NcdcRecordParser
();
@Override
public
void
process
(
String input
,
Emitter
<
WeatherRecord
>
emitter
) {
parser
.
parse
(
input
);
if
(
parser
.
isValidTemperature
()) {
emitter
.
emit
(
new
WeatherRecord
(
parser
.
getYearInt
(),
parser
.
getAirTemperature
(),
parser
.
getStationId
()));
}
}
},
Avros
.
records
(
WeatherRecord
.
class
));