Database Reference
In-Depth Information
duce shuffle, and indeed that's how it's implemented for the MapReduce execution en-
gine. In terms of Crunch types,
groupByKey()
returns a
PGroupedTable<K, V>
,
which is a
PCollection<Pair<K, Iterable<V>>>
, or a multi-map where each
key is paired with an iterable collection over its values.
Continuing from the previous code snippet, if we group the
PTable
of length-string
mappings by key, we get the following (where the items in square brackets indicate an it-
erable collection):
PGroupedTable
<
Integer
,
String
>
c
=
b
.
groupByKey
();
assertEquals
(
"{(5,[apple]),(6,[banana,cherry])}"
,
dump
(
c
));
Crunch uses information on the size of the table to set the number of partitions (reduce
tasks in MapReduce) to use for the
groupByKey()
operation. Most of the time the de-
fault is fine, but you can explicitly set the number of partitions by using the overloaded
form,
groupByKey(int)
, if needed.
combineValues()
Despite the suggestive naming,
PGroupedTable
is not actually a subclass of
PTable
,
so you can't call methods like
groupByKey()
on it. This is because there is no reason
to group by key on a
PTable
that was already grouped by key. Another way of thinking
about
PGroupedTable
is as an intermediate representation before generating another
PTable
. After all, the reason to group by key is so you can do something to the values
for each key. This is the basis of the fourth primitive operation,
combineValues()
.
In its most general form,
combineValues()
takes a combining function
Com-
bineFn<K, V>
, which is a more concise name for
DoFn<Pair<K, Iter-
able<V>>, Pair<K, V>>
, and returns a
PTable<K, V>
. To see it in action, con-
sider a combining function that concatenates all the string values together for a key, using
a semicolon as a separator:
PTable
<
Integer
,
String
>
d
=
c
.
combineValues
(
new
CombineFn
<
Integer
,
String
>() {
@Override
public
void
process
(
Pair
<
Integer
,
Iterable
<
String
>>
input
,
Emitter
<
Pair
<
Integer
,
String
>>
emitter
) {
StringBuilder sb
=
new
StringBuilder
();
for
(
Iterator i
=
input
.
second
().
iterator
();
i
.
hasNext
(); ) {
sb
.
append
(
i
.
next
());
if
(
i
.
hasNext
()) {
sb
.
append
(
";"
); }
}
emitter
.
emit
(
Pair
.
of
(
input
.
first
(),
sb
.
toString
()));