Database Reference
In-Depth Information
This program would run, but if we had a larger table (say, with IP addresses instead
of call signs), the signPrefixes could easily be several megabytes in size, making it
expensive to send that Array from the master alongside each task. In addition, if we
used the same signPrefixes object later (maybe we next ran the same code on
file2.txt ), it would be sent again to each node.
We can fix this by making signPrefixes a broadcast variable. A broadcast variable is
simply an object of type spark.broadcast.Broadcast[T] , which wraps a value of
type T . We can access this value by calling value on the Broadcast object in our
tasks. The value is sent to each node only once, using an efficient, BitTorrent-like
communication mechanism.
Using broadcast variables, our previous example looks like Examples 6-7 through
6-9 .
Example 6-7. Country lookup with Broadcast values in Python
# Look up the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup.
signPrefixes = sc . broadcast ( loadCallSignTable ())
def processSignCount ( sign_count , signPrefixes ):
country = lookupCountry ( sign_count [ 0 ], signPrefixes . value )
count = sign_count [ 1 ]
return ( country , count )
countryContactCounts = ( contactCounts
. map ( processSignCount )
. reduceByKey (( lambda x , y : x + y )))
countryContactCounts . saveAsTextFile ( outputDir + "/countries.txt" )
Example 6-8. Country lookup with Broadcast values in Scala
// Look up the countries for each call sign for the
// contactCounts RDD. We load an array of call sign
// prefixes to country code to support this lookup.
val signPrefixes = sc . broadcast ( loadCallSignTable ())
val countryContactCounts = contactCounts . map { case ( sign , count ) =>
val country = lookupInArray ( sign , signPrefixes . value )
( country , count )
}. reduceByKey (( x , y ) => x + y )
countryContactCounts . saveAsTextFile ( outputDir + "/countries.txt" )
Search WWH ::




Custom Search