Database Reference
In-Depth Information
to note that this happens the first time a key is found in each partition, rather than
only the first time the key is found in the RDD.
If it is a value we have seen before while processing that partition, it will instead use
the provided function,
mergeValue()
, with the current value for the accumulator for
that key and the new value.
Since each partition is processed independently, we can have multiple accumulators
for the same key. When we are merging the results from each partition, if two or
more partitions have an accumulator for the same key we merge the accumulators
using the user-supplied
mergeCombiners()
function.
We can disable map-side aggregation in
combineByKey()
if we
know that our data won't benefit from it. For example,
groupBy
Key()
disables map-side aggregation as the aggregation function
(appending to a list) does not save any space. If we want to disable
map-side combines, we need to specify the partitioner; for now you
can just use the partitioner on the source RDD by passing
rdd.par
titioner
.
Since
combineByKey()
has a lot of different parameters it is a great candidate for an
explanatory example. To better illustrate how
combineByKey()
works, we will look at
computing the average value for each key, as shown in Examples
4-12
through
4-14
and illustrated in
Figure 4-3
.
Example 4-12. Per-key average using combineByKey() in Python
sumCount
=
nums
.
combineByKey
((
lambda
x
:
(
x
,
1
)),
(
lambda
x
,
y
:
(
x
[
0
]
+
y
,
x
[
1
]
+
1
)),
(
lambda
x
,
y
:
(
x
[
0
]
+
y
[
0
],
x
[
1
]
+
y
[
1
])))
sumCount
.
map
(
lambda
key
,
xy
:
(
key
,
xy
[
0
]
/
xy
[
1
]))
.
collectAsMap
()
Example 4-13. Per-key average using combineByKey() in Scala
val
result
=
input
.
combineByKey
(
(
v
)
=>
(
v
,
1
),
(
acc
:
(
Int
,
Int
),
v
)
=>
(
acc
.
_1
+
v
,
acc
.
_2
+
1
),
(
acc1
:
(
Int
,
Int
),
acc2
:
(
Int
,
Int
))
=>
(
acc1
.
_1
+
acc2
.
_1
,
acc1
.
_2
+
acc2
.
_2
)
).
map
{
case
(
key
,
value
)
=>
(
key
,
value
.
_1
/
value
.
_2
.
toFloat
)
}
result
.
collectAsMap
().
map
(
println
(
_
))
Example 4-14. Per-key average using combineByKey() in Java
public
static
class
AvgCount
implements
Serializable
{
public
AvgCount
(
int
total
,
int
num
)
{
total_
=
total
;
num_
=
num
;
}
public
int
total_
;