Database Reference
In-Depth Information
To summarize, accumulators work as follows:
• We create them in the driver by calling the SparkContext.accumulator(initial
Value) method, which produces an accumulator holding an initial value. The
return type is an org.apache.spark.Accumulator[T] object, where T is the type
of initialValue .
• Worker code in Spark closures can add to the accumulator with its += method
(or add in Java).
• The driver program can call the value property on the accumulator to access its
value (or call value() and setValue() in Java).
Note that tasks on worker nodes cannot access the accumulator's value() —from the
point of view of these tasks, accumulators are write-only variables. This allows accu‐
mulators to be implemented efficiently, without having to communicate every
update.
The type of counting shown here becomes especially handy when there are multiple
values to keep track of, or when the same value needs to increase at multiple places in
the parallel program (for example, you might be counting calls to a JSON parsing
library throughout your program). For instance, often we expect some percentage of
our data to be corrupted, or allow for the backend to fail some number of times. To
prevent producing garbage output when there are too many errors, we can use a
counter for valid records and a counter for invalid records. The value of our accumu‐
lators is available only in the driver program, so that is where we place our checks.
Continuing from our last example, we can now validate the call signs and write the
output only if most of the input is valid. The ham radio call sign format is specified in
Article 19 by the International Telecommunication Union, from which we construct
a regular expression to verify conformance, shown in Example 6-5 .
Example 6-5. Accumulator error count in Python
# Create Accumulators for validating call signs
validSignCount = sc . accumulator ( 0 )
invalidSignCount = sc . accumulator ( 0 )
def validateSign ( sign ):
global validSignCount , invalidSignCount
if re . match ( r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z" , sign ):
validSignCount += 1
return True
else :
invalidSignCount += 1
return False
Search WWH ::




Custom Search