Database Reference
In-Depth Information
duce shuffle, and indeed that's how it's implemented for the MapReduce execution en-
gine. In terms of Crunch types, groupByKey() returns a PGroupedTable<K, V> ,
which is a PCollection<Pair<K, Iterable<V>>> , or a multi-map where each
key is paired with an iterable collection over its values.
Continuing from the previous code snippet, if we group the PTable of length-string
mappings by key, we get the following (where the items in square brackets indicate an it-
erable collection):
PGroupedTable < Integer , String > c = b . groupByKey ();
assertEquals ( "{(5,[apple]),(6,[banana,cherry])}" , dump ( c ));
Crunch uses information on the size of the table to set the number of partitions (reduce
tasks in MapReduce) to use for the groupByKey() operation. Most of the time the de-
fault is fine, but you can explicitly set the number of partitions by using the overloaded
form, groupByKey(int) , if needed.
combineValues()
Despite the suggestive naming, PGroupedTable is not actually a subclass of PTable ,
so you can't call methods like groupByKey() on it. This is because there is no reason
to group by key on a PTable that was already grouped by key. Another way of thinking
about PGroupedTable is as an intermediate representation before generating another
PTable . After all, the reason to group by key is so you can do something to the values
for each key. This is the basis of the fourth primitive operation, combineValues() .
In its most general form, combineValues() takes a combining function Com-
bineFn<K, V> , which is a more concise name for DoFn<Pair<K, Iter-
able<V>>, Pair<K, V>> , and returns a PTable<K, V> . To see it in action, con-
sider a combining function that concatenates all the string values together for a key, using
a semicolon as a separator:
PTable < Integer , String > d = c . combineValues ( new CombineFn < Integer ,
String >() {
@Override
public void process ( Pair < Integer , Iterable < String >> input ,
Emitter < Pair < Integer , String >> emitter ) {
StringBuilder sb = new StringBuilder ();
for ( Iterator i = input . second (). iterator (); i . hasNext (); ) {
sb . append ( i . next ());
if ( i . hasNext ()) { sb . append ( ";" ); }
}
emitter . emit ( Pair . of ( input . first (), sb . toString ()));
Search WWH ::




Custom Search