Database Reference
In-Depth Information
This program would run, but if we had a larger table (say, with IP addresses instead
of call signs), the
signPrefixes
could easily be several megabytes in size, making it
expensive to send that
Array
from the master alongside each task. In addition, if we
used the same
signPrefixes
object later (maybe we next ran the same code on
file2.txt
), it would be sent
again
to each node.
We can fix this by making
signPrefixes
a broadcast variable. A broadcast variable is
simply an object of type
spark.broadcast.Broadcast[T]
, which wraps a value of
type
T
. We can access this value by calling
value
on the
Broadcast
object in our
tasks. The value is sent to each node only once, using an efficient, BitTorrent-like
communication mechanism.
Using broadcast variables, our previous example looks like Examples
6-7
through
6-9
.
Example 6-7. Country lookup with Broadcast values in Python
# Look up the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup.
signPrefixes
=
sc
.
broadcast
(
loadCallSignTable
())
def
processSignCount
(
sign_count
,
signPrefixes
):
country
=
lookupCountry
(
sign_count
[
0
],
signPrefixes
.
value
)
count
=
sign_count
[
1
]
return
(
country
,
count
)
countryContactCounts
=
(
contactCounts
.
map
(
processSignCount
)
.
reduceByKey
((
lambda
x
,
y
:
x
+
y
)))
countryContactCounts
.
saveAsTextFile
(
outputDir
+
"/countries.txt"
)
Example 6-8. Country lookup with Broadcast values in Scala
// Look up the countries for each call sign for the
// contactCounts RDD. We load an array of call sign
// prefixes to country code to support this lookup.
val
signPrefixes
=
sc
.
broadcast
(
loadCallSignTable
())
val
countryContactCounts
=
contactCounts
.
map
{
case
(
sign
,
count
)
=>
val
country
=
lookupInArray
(
sign
,
signPrefixes
.
value
)
(
country
,
count
)
}.
reduceByKey
((
x
,
y
)
=>
x
+
y
)
countryContactCounts
.
saveAsTextFile
(
outputDir
+
"/countries.txt"
)