Database Reference
In-Depth Information
Much like lazy evaluation in RDDs, if no output operation is
applied on a DStream and any of its descendants, then those
DStreams will not be evaluated. And if there are no output opera‐
tions set in a StreamingContext, then the context will not start.
A common debugging output operation that we have used already is
print()
. This
grabs the first 10 elements from each batch of the DStream and prints the results.
Once we've debugged our program, we can also use output operations to save results.
Spark Streaming has similar
save()
operations for DStreams, each of which takes a
directory to save files into and an optional suffix. The results of each batch are saved
as subdirectories in the given directory, with the time and the suffix in the filename.
For instance, we can save our IP address counts as shown in
Example 10-25
.
Example 10-25. Saving DStream to text files in Scala
ipAddressRequestCount
.
saveAsTextFiles
(
"outputDir"
,
"txt"
)
The more general
saveAsHadoopFiles()
takes a Hadoop OutputFormat. For
instance, Spark Streaming doesn't have a built-in
saveAsSequenceFile()
function,
but we can save SequenceFiles as shown in Examples
10-26
and
10-27
.
Example 10-26. Saving SequenceFiles from a DStream in Scala
val
writableIpAddressRequestCount
=
ipAddressRequestCount
.
map
{
(
ip
,
count
)
=>
(
new
Text
(
ip
),
new
LongWritable
(
count
))
}
writableIpAddressRequestCount
.
saveAsHadoopFiles
[
SequenceFileOutputFormat
[
Text
,
LongWritable
]](
"outputDir"
,
"txt"
)
Example 10-27. Saving SequenceFiles from a DStream in Java
JavaPairDStream
<
Text
,
LongWritable
>
writableDStream
=
ipDStream
.
mapToPair
(
new
PairFunction
<
Tuple2
<
String
,
Long
>,
Text
,
LongWritable
>()
{
public
Tuple2
<
Text
,
LongWritable
>
call
(
Tuple2
<
String
,
Long
>
e
)
{
return
new
Tuple2
(
new
Text
(
e
.
_1
()),
new
LongWritable
(
e
.
_2
()));
}});
class
OutFormat
extends
SequenceFileOutputFormat
<
Text
,
LongWritable
>
{};
writableDStream
.
saveAsHadoopFiles
(
"outputDir"
,
"txt"
,
Text
.
class
,
LongWritable
.
class
,
OutFormat
.
class
);
Finally,
foreachRDD()
is a generic output operation that lets us run arbitrary compu‐
tations on the RDDs on the DStream. It is similar to
transform()
in that it gives you
access to each RDD. Within
foreachRDD()
, we can reuse all the actions we have in
Spark. For example, a common use case is to write data to an external database such
as MySQL, where Spark may not have a
saveAs()
function, but we might use
for
eachPartition()
on the RDD to write it out. For convenience,
foreachRDD()
can