Database Reference
In-Depth Information
to note that this happens the first time a key is found in each partition, rather than
only the first time the key is found in the RDD.
If it is a value we have seen before while processing that partition, it will instead use
the provided function, mergeValue() , with the current value for the accumulator for
that key and the new value.
Since each partition is processed independently, we can have multiple accumulators
for the same key. When we are merging the results from each partition, if two or
more partitions have an accumulator for the same key we merge the accumulators
using the user-supplied mergeCombiners() function.
We can disable map-side aggregation in combineByKey() if we
know that our data won't benefit from it. For example, groupBy
Key() disables map-side aggregation as the aggregation function
(appending to a list) does not save any space. If we want to disable
map-side combines, we need to specify the partitioner; for now you
can just use the partitioner on the source RDD by passing rdd.par
titioner .
Since combineByKey() has a lot of different parameters it is a great candidate for an
explanatory example. To better illustrate how combineByKey() works, we will look at
computing the average value for each key, as shown in Examples 4-12 through 4-14
and illustrated in Figure 4-3 .
Example 4-12. Per-key average using combineByKey() in Python
sumCount = nums . combineByKey (( lambda x : ( x , 1 )),
( lambda x , y : ( x [ 0 ] + y , x [ 1 ] + 1 )),
( lambda x , y : ( x [ 0 ] + y [ 0 ], x [ 1 ] + y [ 1 ])))
sumCount . map ( lambda key , xy : ( key , xy [ 0 ] / xy [ 1 ])) . collectAsMap ()
Example 4-13. Per-key average using combineByKey() in Scala
val result = input . combineByKey (
( v ) => ( v , 1 ),
( acc : ( Int , Int ), v ) => ( acc . _1 + v , acc . _2 + 1 ),
( acc1 : ( Int , Int ), acc2 : ( Int , Int )) => ( acc1 . _1 + acc2 . _1 , acc1 . _2 + acc2 . _2 )
). map { case ( key , value ) => ( key , value . _1 / value . _2 . toFloat ) }
result . collectAsMap (). map ( println ( _ ))
Example 4-14. Per-key average using combineByKey() in Java
public static class AvgCount implements Serializable {
public AvgCount ( int total , int num ) { total_ = total ; num_ = num ; }
public int total_ ;
Search WWH ::




Custom Search