Database Reference
In-Depth Information
val
sets
:
RDD
[(
String
,
HashSet
[
Int
])]
=
pairs
.
aggregateByKey
(
new
HashSet
[
Int
])(
_
+=
_
,
_
++=
_
)
assert
(
sets
.
collect
().
toSet
===
Set
((
"a"
,
Set
(
1
,
3
,
5
)), (
"b"
,
Set
(
7
))))
For set addition, the zero value is the empty set, so we create a new mutable set with
new
HashSet[Int]
. We have to supply two functions to
aggregateByKey()
. The first
controls how an
Int
is combined with a
HashSet[Int]
, and in this case we use the
addition and assignment function
_+=_
to add the integer to the set (
_+_
would return a
new set and leave the first set unchanged).
The second function controls how two
HashSet[Int]
values are combined (this hap-
pens after the combiner runs in the map task, while the two partitions are being aggreg-
ated in the reduce task), and here we use
_++=_
to add all the elements of the second set
to the first.
For key
a
, the sequence of operations might be:
((
∅
+ 3) + 1) + 5) = (1, 3, 5)
or:
(
∅
+ 3) + 1) ++ (
∅
+ 5) = (1, 3) ++ (5) = (1, 3, 5)
if Spark uses a combiner.
A transformed RDD can be persisted in memory so that subsequent operations on it are
more efficient. We look at that next.
Persistence
Going back to the introductory example in
An Example
,
we can cache the intermediate
dataset of year-temperature pairs in memory with the following:
scala>
tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18
Calling
cache()
does not cache the RDD in memory straightaway. Instead, it marks the
RDD with a flag indicating it should be cached when the Spark job is run. So let's first
force a job run:
scala>
tuples.reduceByKey((a, b) => Math.max(a,
b)).foreach(println(_))
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640