Database Reference
In-Depth Information
PTypeFamily tf
=
table
.
getTypeFamily
();
final
PType
<
V
>
valueType
=
table
.
getValueType
();
return
table
.
groupByKey
().
mapValues
(
"unique"
,
new
MapFn
<
Iterable
<
V
>,
Collection
<
V
>>() {
@Override
public
void
initialize
() {
valueType
.
initialize
(
getConfiguration
());
}
@Override
public
Set
<
V
>
map
(
Iterable
<
V
>
values
) {
Set
<
V
>
collected
=
new
HashSet
<
V
>();
for
(
V value
:
values
) {
collected
.
add
(
valueType
.
getDetachedValue
(
value
)
);
}
return
collected
;
}
},
tf
.
collections
(
table
.
getValueType
()));
}
The idea is to group by key, then iterate over each value associated with a key and collect
the unique values in a
Set
, which will automatically remove duplicates. Since we want to
retain the values outside the iteration, we need to make a copy of each value before we put
it in the set.
Fortunately, we don't need to write code that knows how to perform the copy for each
possible Java class; we can use the
getDetachedValue()
method that Crunch
provides for exactly this purpose on
PType
, which we get from the table's value type.
Notice that we also have to initialize the
PType
in the
DoFn
's
initialize()
method
so that the
PType
can access the configuration in order to perform the copying.
For immutable objects like
String
s or
Integer
s, calling
getDetachedValue()
is
actually a no-op, but for mutable Avro or
Writable
types, a deep copy of each value is
made.
Materialization
Materialization
is the process of making the values in a
PCollection
available
so they can be read in your program. For example, you might want to read all the values
from a (typically small)
PCollection
and display them, or send them to another part of
your program, rather than writing them to a Crunch target. Another reason to materialize a
PCollection
is to use the contents as the basis for determining further processing steps