Database Reference
In-Depth Information
To summarize, accumulators work as follows:
• We create them in the driver by calling the
SparkContext.accumulator(initial
Value)
method, which produces an accumulator holding an initial value. The
return type is an
org.apache.spark.Accumulator[T]
object, where
T
is the type
of
initialValue
.
• Worker code in Spark closures can add to the accumulator with its
+=
method
(or
add
in Java).
• The driver program can call the
value
property on the accumulator to access its
value (or call
value()
and
setValue()
in Java).
Note that tasks on worker nodes cannot access the accumulator's
value()
—from the
point of view of these tasks, accumulators are
write-only
variables. This allows accu‐
mulators to be implemented efficiently, without having to communicate every
update.
The type of counting shown here becomes especially handy when there are multiple
values to keep track of, or when the same value needs to increase at multiple places in
the parallel program (for example, you might be counting calls to a JSON parsing
library throughout your program). For instance, often we expect some percentage of
our data to be corrupted, or allow for the backend to fail some number of times. To
prevent producing garbage output when there are too many errors, we can use a
counter for valid records and a counter for invalid records. The value of our accumu‐
lators is available only in the driver program, so that is where we place our checks.
Continuing from our last example, we can now validate the call signs and write the
output only if most of the input is valid. The ham radio call sign format is specified in
Article 19 by the International Telecommunication Union, from which we construct
a regular expression to verify conformance, shown in
Example 6-5
.
Example 6-5. Accumulator error count in Python
# Create Accumulators for validating call signs
validSignCount
=
sc
.
accumulator
(
0
)
invalidSignCount
=
sc
.
accumulator
(
0
)
def
validateSign
(
sign
):
global
validSignCount
,
invalidSignCount
if
re
.
match
(
r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z"
,
sign
):
validSignCount
+=
1
return
True
else
:
invalidSignCount
+=
1
return
False