Database Reference
In-Depth Information
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs ( logFileName : String ) {
val events = sc . sequenceFile [ UserID , LinkInfo ]( logFileName )
val joined = userData . join ( events ) // RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined . filter {
case ( userId , ( userInfo , linkInfo )) => // Expand the tuple into its components
! userInfo . topics . contains ( linkInfo . topic )
}. count ()
println ( "Number of visits to non-subscribed topics: " + offTopicVisits )
}
This code will run fine as is, but it will be inefficient. This is because the join() oper‐
ation, called each time processNewLogs() is invoked, does not know anything about
how the keys are partitioned in the datasets. By default, this operation will hash all
the keys of both datasets, sending elements with the same key hash across the net‐
work to the same machine, and then join together the elements with the same key on
that machine (see Figure 4-4 ). Because we expect the userData table to be much
larger than the small log of events seen every five minutes, this wastes a lot of work:
the userData table is hashed and shuffled across the network on every call, even
though it doesn't change.
Figure 4-4. Each join of userData and events without using partitionBy()
Fixing this is simple: just use the partitionBy() transformation on userData to
hash-partition it at the start of the program. We do this by passing a spark.HashPar
titioner object to partitionBy , as shown in Example 4-23 .
 
Search WWH ::




Custom Search