Database Reference
In-Depth Information
callSigns
.
saveAsTextFile
(
outputDir
+
"/callsigns"
)
print
"Blank lines:
%d
"
%
blankLines
.
value
Example 6-3. Accumulator empty line count in Scala
val
sc
=
new
SparkContext
(...)
val
file
=
sc
.
textFile
(
"file.txt"
)
val
blankLines
=
sc
.
accumulator
(
0
)
// Create an Accumulator[Int] initialized to 0
val
callSigns
=
file
.
flatMap
(
line
=>
{
if
(
line
==
""
)
{
blankLines
+=
1
// Add to the accumulator
}
line
.
split
(
" "
)
})
callSigns
.
saveAsTextFile
(
"output.txt"
)
println
(
"Blank lines: "
+
blankLines
.
value
)
Example 6-4. Accumulator empty line count in Java
JavaRDD
<
String
>
rdd
=
sc
.
textFile
(
args
[
1
]);
final
Accumulator
<
Integer
>
blankLines
=
sc
.
accumulator
(
0
);
JavaRDD
<
String
>
callSigns
=
rdd
.
flatMap
(
new
FlatMapFunction
<
String
,
String
>()
{
public
Iterable
<
String
>
call
(
String
line
)
{
if
(
line
.
equals
(
""
))
{
blankLines
.
add
(
1
);
}
return
Arrays
.
asList
(
line
.
split
(
" "
));
}});
callSigns
.
saveAsTextFile
(
"output.txt"
)
System
.
out
.
println
(
"Blank lines: "
+
blankLines
.
value
());
In these examples, we create an
Accumulator[Int]
called
blankLines
, and then add
1 to it whenever we see a blank line in the input. After evaluating the transformation,
we print the value of the counter. Note that we will see the right count only
after
we
run the
saveAsTextFile()
action, because the transformation above it,
map()
, is lazy,
so the side-effect incrementing of the accumulator will happen only when the lazy
map()
transformation is forced to occur by the
saveAsTextFile()
action.
Of course, it is possible to aggregate values from an entire RDD back to the driver
program using actions like
reduce()
, but sometimes we need a simple way to aggre‐
gate values that, in the process of transforming an RDD, are generated at different
scale or granularity than that of the RDD itself. In the previous example, accumula‐
tors let us count errors as we load the data, without doing a separate
filter()
or
reduce()
.