Database Reference
In-Depth Information
Much like lazy evaluation in RDDs, if no output operation is
applied on a DStream and any of its descendants, then those
DStreams will not be evaluated. And if there are no output opera‐
tions set in a StreamingContext, then the context will not start.
A common debugging output operation that we have used already is print() . This
grabs the first 10 elements from each batch of the DStream and prints the results.
Once we've debugged our program, we can also use output operations to save results.
Spark Streaming has similar save() operations for DStreams, each of which takes a
directory to save files into and an optional suffix. The results of each batch are saved
as subdirectories in the given directory, with the time and the suffix in the filename.
For instance, we can save our IP address counts as shown in Example 10-25 .
Example 10-25. Saving DStream to text files in Scala
ipAddressRequestCount . saveAsTextFiles ( "outputDir" , "txt" )
The more general saveAsHadoopFiles() takes a Hadoop OutputFormat. For
instance, Spark Streaming doesn't have a built-in saveAsSequenceFile() function,
but we can save SequenceFiles as shown in Examples 10-26 and 10-27 .
Example 10-26. Saving SequenceFiles from a DStream in Scala
val writableIpAddressRequestCount = ipAddressRequestCount . map {
( ip , count ) => ( new Text ( ip ), new LongWritable ( count )) }
writableIpAddressRequestCount . saveAsHadoopFiles [
SequenceFileOutputFormat [ Text , LongWritable ]]( "outputDir" , "txt" )
Example 10-27. Saving SequenceFiles from a DStream in Java
JavaPairDStream < Text , LongWritable > writableDStream = ipDStream . mapToPair (
new PairFunction < Tuple2 < String , Long >, Text , LongWritable >() {
public Tuple2 < Text , LongWritable > call ( Tuple2 < String , Long > e ) {
return new Tuple2 ( new Text ( e . _1 ()), new LongWritable ( e . _2 ()));
}});
class OutFormat extends SequenceFileOutputFormat < Text , LongWritable > {};
writableDStream . saveAsHadoopFiles (
"outputDir" , "txt" , Text . class , LongWritable . class , OutFormat . class );
Finally, foreachRDD() is a generic output operation that lets us run arbitrary compu‐
tations on the RDDs on the DStream. It is similar to transform() in that it gives you
access to each RDD. Within foreachRDD() , we can reuse all the actions we have in
Spark. For example, a common use case is to write data to an external database such
as MySQL, where Spark may not have a saveAs() function, but we might use for
eachPartition() on the RDD to write it out. For convenience, foreachRDD() can
Search WWH ::




Custom Search