Database Reference
In-Depth Information
Example 6-9. Country lookup with Broadcast values in Java
// Read in the call sign table
// Look up the countries for each call sign in the
// contactCounts RDD
final
Broadcast
<
String
[]>
signPrefixes
=
sc
.
broadcast
(
loadCallSignTable
());
JavaPairRDD
<
String
,
Integer
>
countryContactCounts
=
contactCounts
.
mapToPair
(
new
PairFunction
<
Tuple2
<
String
,
Integer
>,
String
,
Integer
>
(){
public
Tuple2
<
String
,
Integer
>
call
(
Tuple2
<
String
,
Integer
>
callSignCount
)
{
String
sign
=
callSignCount
.
_1
();
String
country
=
lookupCountry
(
sign
,
callSignInfo
.
value
());
return
new
Tuple2
(
country
,
callSignCount
.
_2
());
}}).
reduceByKey
(
new
SumInts
());
countryContactCounts
.
saveAsTextFile
(
outputDir
+
"/countries.txt"
);
As shown in these examples, the process of using broadcast variables is simple:
1. Create a
Broadcast[T]
by calling
SparkContext.broadcast
on an object of type
T
. Any type works as long as it is also
Serializable
.
2. Access its value with the
value
property (or
value()
method in Java).
3. The variable will be sent to each node only once, and should be treated as read-
only (updates will
not
be propagated to other nodes).
The easiest way to satisfy the
read-only
requirement is to broadcast a primitive value
or a reference to an immutable object. In such cases, you won't be able to change the
value of the broadcast variable except within the driver code. However, sometimes it
can be more convenient or more efficient to broadcast a mutable object. If you do
that, it is up to you to maintain the read-only condition. As we did with our call sign
prefix table of
Array[String]
, we must make sure that the code we run on our
worker nodes does not try to do something like
val theArray = broadcastAr
ray.value; theArray(0) = newValue
. When run in a worker node, that line will
assign
newValue
to the first array element only in the copy of the array local to the
worker node running the code; it will not change the contents of
broadcastAr
ray.value
on any of the other worker nodes.
Optimizing Broadcasts
When we are broadcasting large values, it is important to choose a data serialization
format that is both fast and compact, because the time to send the value over the net‐
work can quickly become a bottleneck if it takes a long time to either serialize a value
or to send the serialized value over the network. In particular, Java Serialization, the
default serialization library used in Spark's Scala and Java APIs, can be very ineffi‐
cient out of the box for anything except arrays of primitive types. You can optimize
serialization by selecting a different serialization library using the
spark.serializer
property (
Chapter 8
will describe how to use
Kryo
, a faster serialization library), or by