Database Reference
In-Depth Information
val sets : RDD [( String , HashSet [ Int ])] =
pairs . aggregateByKey ( new HashSet [ Int ])( _ += _ , _ ++= _ )
assert ( sets . collect (). toSet === Set (( "a" , Set ( 1 , 3 , 5 )), ( "b" ,
Set ( 7 ))))
For set addition, the zero value is the empty set, so we create a new mutable set with new
HashSet[Int] . We have to supply two functions to aggregateByKey() . The first
controls how an Int is combined with a HashSet[Int] , and in this case we use the
addition and assignment function _+=_ to add the integer to the set ( _+_ would return a
new set and leave the first set unchanged).
The second function controls how two HashSet[Int] values are combined (this hap-
pens after the combiner runs in the map task, while the two partitions are being aggreg-
ated in the reduce task), and here we use _++=_ to add all the elements of the second set
to the first.
For key a , the sequence of operations might be:
(( + 3) + 1) + 5) = (1, 3, 5)
or:
( + 3) + 1) ++ ( + 5) = (1, 3) ++ (5) = (1, 3, 5)
if Spark uses a combiner.
A transformed RDD can be persisted in memory so that subsequent operations on it are
more efficient. We look at that next.
Persistence
Going back to the introductory example in An Example , we can cache the intermediate
dataset of year-temperature pairs in memory with the following:
scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18
Calling cache() does not cache the RDD in memory straightaway. Instead, it marks the
RDD with a flag indicating it should be cached when the Spark job is run. So let's first
force a job run:
scala> tuples.reduceByKey((a, b) => Math.max(a,
b)).foreach(println(_))
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640
Search WWH ::




Custom Search