Database Reference
In-Depth Information
Example 6-9. Country lookup with Broadcast values in Java
// Read in the call sign table
// Look up the countries for each call sign in the
// contactCounts RDD
final Broadcast < String []> signPrefixes = sc . broadcast ( loadCallSignTable ());
JavaPairRDD < String , Integer > countryContactCounts = contactCounts . mapToPair (
new PairFunction < Tuple2 < String , Integer >, String , Integer > (){
public Tuple2 < String , Integer > call ( Tuple2 < String , Integer > callSignCount ) {
String sign = callSignCount . _1 ();
String country = lookupCountry ( sign , callSignInfo . value ());
return new Tuple2 ( country , callSignCount . _2 ());
}}). reduceByKey ( new SumInts ());
countryContactCounts . saveAsTextFile ( outputDir + "/countries.txt" );
As shown in these examples, the process of using broadcast variables is simple:
1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type
T . Any type works as long as it is also Serializable .
2. Access its value with the value property (or value() method in Java).
3. The variable will be sent to each node only once, and should be treated as read-
only (updates will not be propagated to other nodes).
The easiest way to satisfy the read-only requirement is to broadcast a primitive value
or a reference to an immutable object. In such cases, you won't be able to change the
value of the broadcast variable except within the driver code. However, sometimes it
can be more convenient or more efficient to broadcast a mutable object. If you do
that, it is up to you to maintain the read-only condition. As we did with our call sign
prefix table of Array[String] , we must make sure that the code we run on our
worker nodes does not try to do something like val theArray = broadcastAr
ray.value; theArray(0) = newValue . When run in a worker node, that line will
assign newValue to the first array element only in the copy of the array local to the
worker node running the code; it will not change the contents of broadcastAr
ray.value on any of the other worker nodes.
Optimizing Broadcasts
When we are broadcasting large values, it is important to choose a data serialization
format that is both fast and compact, because the time to send the value over the net‐
work can quickly become a bottleneck if it takes a long time to either serialize a value
or to send the serialized value over the network. In particular, Java Serialization, the
default serialization library used in Spark's Scala and Java APIs, can be very ineffi‐
cient out of the box for anything except arrays of primitive types. You can optimize
serialization by selecting a different serialization library using the spark.serializer
property ( Chapter 8 will describe how to use Kryo , a faster serialization library), or by
Search WWH ::




Custom Search