Database Reference
In-Depth Information
Aggregation transformations
The three main transformations for aggregating RDDs of pairs by their keys are re-
duceByKey() , foldByKey() , and aggregateByKey() . They work in slightly
different ways, but they all aggregate the values for a given key to produce a single value
for each key. (The equivalent actions are reduce() , fold() , and aggregate() ,
which operate in an analogous way, resulting in a single value for the whole RDD.)
The simplest is reduceByKey() , which repeatedly applies a binary function to values
in pairs until a single value is produced. For example:
val pairs : RDD [( String , Int )] =
sc . parallelize ( Array (( "a" , 3 ), ( "a" , 1 ), ( "b" , 7 ), ( "a" , 5 )))
val sums : RDD [( String , Int )] = pairs . reduceByKey ( _ + _ )
assert ( sums . collect (). toSet === Set (( "a" , 9 ), ( "b" , 7 )))
The values for key a are aggregated using the addition function ( _+_ ) as (3 + 1) + 5 = 9 ,
while there is only one value for key b , so no aggregation is needed. Since in general the
operations are distributed and performed in different tasks for different partitions of the
RDD, the function should be commutative and associative. In other words, the order and
grouping of the operations should not matter; in this case, the aggregation could be 5 + (3
+ 1) , or 3 + (1 + 5) , which both return the same result.
NOTE
The triple equals operator ( === ) used in the assert statement is from ScalaTest, and provides more in-
formative failure messages than using the regular == operator.
Here's how we would perform the same operation using foldByKey() :
val sums : RDD [( String , Int )] = pairs . foldByKey ( 0 )( _ + _ )
assert ( sums . collect (). toSet === Set (( "a" , 9 ), ( "b" , 7 )))
Notice that this time we had to supply a zero value , which is just 0 when adding integers,
but would be something different for other types and operations. This time, values for a
are aggregated as ((0 + 3) + 1) + 5) = 9 (or possibly some other order, although adding to
0 is always the first operation). For b it is 0 + 7 = 7 .
Using foldByKey() is no more or less powerful than using reduceByKey() . In par-
ticular, neither can change the type of the value that is the result of the aggregation. For
that we need aggregateByKey() . For example, we can aggregate the integer values
into a set:
Search WWH ::




Custom Search