Database Reference
In-Depth Information
maximum of the accumulated values instead of adding them). Custom accumulators
need to extend AccumulatorParam , which is covered in the Spark API documenta‐
tion . Beyond adding to a numeric value, we can use any operation for add, provided
that operation is commutative and associative. For example, instead of adding to
track the total we could keep track of the maximum value seen so far.
An operation op is commutative if a op b = b op a for all values a , b .
An operation op is associative if (a op b) op c = a op (b op c) for all
values a , b , and c .
For example, sum and max are commutative and associative opera‐
tions that are commonly used in Spark accumulators.
Broadcast Variables
Spark's second type of shared variable, broadcast variables , allows the program to
efficiently send a large, read-only value to all the worker nodes for use in one or more
Spark operations. They come in handy, for example, if your application needs to send
a large, read-only lookup table to all the nodes, or even a large feature vector in a
machine learning algorithm.
Recall that Spark automatically sends all variables referenced in your closures to the
worker nodes. While this is convenient, it can also be inefficient because (1) the
default task launching mechanism is optimized for small task sizes, and (2) you
might, in fact, use the same variable in multiple parallel operations, but Spark will
send it separately for each operation. As an example, say that we wanted to write a
Spark program that looks up countries by their call signs by prefix matching in an
array. This is useful for ham radio call signs since each country gets its own prefix,
although the prefixes are not uniform in length. If we wrote this naively in Spark, the
code might look like Example 6-6 .
Example 6-6. Country lookup in Python
# Look up the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup.
signPrefixes = loadCallSignTable ()
def processSignCount ( sign_count , signPrefixes ):
country = lookupCountry ( sign_count [ 0 ], signPrefixes )
count = sign_count [ 1 ]
return ( country , count )
countryContactCounts = ( contactCounts
. map ( processSignCount )
. reduceByKey (( lambda x , y : x + y )))
Search WWH ::




Custom Search