Database Reference
In-Depth Information
Example 4-23. Scala custom partitioner
val sc = new SparkContext (...)
val userData = sc . sequenceFile [ UserID , UserInfo ]( "hdfs://..." )
. partitionBy ( new HashPartitioner ( 100 )) // Create 100 partitions
. persist ()
The processNewLogs() method can remain unchanged: the events RDD is local to
processNewLogs() , and is used only once within this method, so there is no advan‐
tage in specifying a partitioner for events . Because we called partitionBy() when
building userData , Spark will now know that it is hash-partitioned, and calls to
join() on it will take advantage of this information. In particular, when we call user
Data.join(events) , Spark will shuffle only the events RDD, sending events with
each particular UserID to the machine that contains the corresponding hash partition
of userData (see Figure 4-5 ). The result is that a lot less data is communicated over
the network, and the program runs significantly faster.
Figure 4-5. Each join of userData and events using partitionBy()
Note that partitionBy() is a transformation, so it always returns a new RDD—it
does not change the original RDD in place. RDDs can never be modified once cre‐
ated. Therefore it is important to persist and save as userData the result of parti
tionBy() , not the original sequenceFile() . Also, the 100 passed to partitionBy()
represents the number of partitions, which will control how many parallel tasks per‐
form further operations on the RDD (e.g., joins); in general, make this at least as large
as the number of cores in your cluster.
 
Search WWH ::




Custom Search