Database Reference
In-Depth Information
val sets : RDD [( String , HashSet [ Int ])] =
pairs . aggregateByKey ( new HashSet [ Int ])( _ += _ , _ ++= _ )
assert ( sets . collect (). toSet === Set (( "a" , Set ( 1 , 3 , 5 )), ( "b" ,
Set ( 7 ))))
For set addition, the zero value is the empty set, so we create a new mutable set with new
HashSet[Int] . We have to supply two functions to aggregateByKey() . The first
controls how an Int is combined with a HashSet[Int] , and in this case we use the
addition and assignment function _+=_ to add the integer to the set ( _+_ would return a
new set and leave the first set unchanged).
The second function controls how two HashSet[Int] values are combined (this hap-
pens after the combiner runs in the map task, while the two partitions are being aggreg-
ated in the reduce task), and here we use _++=_ to add all the elements of the second set
to the first.
For key a , the sequence of operations might be:
(( + 3) + 1) + 5) = (1, 3, 5)
( + 3) + 1) ++ ( + 5) = (1, 3) ++ (5) = (1, 3, 5)
if Spark uses a combiner.
A transformed RDD can be persisted in memory so that subsequent operations on it are
more efficient. We look at that next.
Going back to the introductory example in An Example , we can cache the intermediate
dataset of year-temperature pairs in memory with the following:
scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18
Calling cache() does not cache the RDD in memory straightaway. Instead, it marks the
RDD with a flag indicating it should be cached when the Spark job is run. So let's first
force a job run:
scala> tuples.reduceByKey((a, b) => Math.max(a,
INFO BlockManagerInfo: Added rdd_4_0 in memory on
INFO BlockManagerInfo: Added rdd_4_1 in memory on
Search WWH ::

Custom Search