Database Reference
In-Depth Information
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def
processNewLogs
(
logFileName
:
String
)
{
val
events
=
sc
.
sequenceFile
[
UserID
,
LinkInfo
](
logFileName
)
val
joined
=
userData
.
join
(
events
)
// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val
offTopicVisits
=
joined
.
filter
{
case
(
userId
,
(
userInfo
,
linkInfo
))
=>
// Expand the tuple into its components
!
userInfo
.
topics
.
contains
(
linkInfo
.
topic
)
}.
count
()
println
(
"Number of visits to non-subscribed topics: "
+
offTopicVisits
)
}
This code will run fine as is, but it will be inefficient. This is because the
join()
oper‐
ation, called each time
processNewLogs()
is invoked, does not know anything about
how the keys are partitioned in the datasets. By default, this operation will hash all
the keys of
both
datasets, sending elements with the same key hash across the net‐
work to the same machine, and then join together the elements with the same key on
that machine (see
Figure 4-4
). Because we expect the
userData
table to be much
larger than the small log of
events
seen every five minutes, this wastes a lot of work:
the
userData
table is hashed and shuffled across the network on every call, even
though it doesn't change.
Figure 4-4. Each join of userData and events without using partitionBy()
Fixing this is simple: just use the
partitionBy()
transformation on
userData
to
hash-partition it at the start of the program. We do this by passing a
spark.HashPar
titioner
object to
partitionBy
, as shown in
Example 4-23
.