Database Reference
In-Depth Information
PTypeFamily tf = table . getTypeFamily ();
final PType < V > valueType = table . getValueType ();
return table . groupByKey (). mapValues ( "unique" ,
new MapFn < Iterable < V >, Collection < V >>() {
@Override
public void initialize () {
valueType . initialize ( getConfiguration ());
}
@Override
public Set < V > map ( Iterable < V > values ) {
Set < V > collected = new HashSet < V >();
for ( V value : values ) {
collected . add ( valueType . getDetachedValue ( value ) );
}
return collected ;
}
}, tf . collections ( table . getValueType ()));
}
The idea is to group by key, then iterate over each value associated with a key and collect
the unique values in a Set , which will automatically remove duplicates. Since we want to
retain the values outside the iteration, we need to make a copy of each value before we put
it in the set.
Fortunately, we don't need to write code that knows how to perform the copy for each
possible Java class; we can use the getDetachedValue() method that Crunch
provides for exactly this purpose on PType , which we get from the table's value type.
Notice that we also have to initialize the PType in the DoFn 's initialize() method
so that the PType can access the configuration in order to perform the copying.
For immutable objects like String s or Integer s, calling getDetachedValue() is
actually a no-op, but for mutable Avro or Writable types, a deep copy of each value is
made.
Materialization
Materialization is the process of making the values in a PCollection available
so they can be read in your program. For example, you might want to read all the values
from a (typically small) PCollection and display them, or send them to another part of
your program, rather than writing them to a Crunch target. Another reason to materialize a
PCollection is to use the contents as the basis for determining further processing steps
Search WWH ::




Custom Search