Database Reference
In-Depth Information
Example 10-13. Joining two DStreams in Java
JavaPairDStream < String , Long > ipBytesDStream =
accessLogsDStream . mapToPair ( new IpContentTuple ());
JavaPairDStream < String , Long > ipBytesSumDStream =
ipBytesDStream . reduceByKey ( new LongSumReducer ());
JavaPairDStream < String , Tuple2 < Long , Long >> ipBytesRequestCountDStream =
ipCountsDStream . join ( ipBytesSumDStream );
We can also merge the contents of two different DStreams using the union() opera‐
tor as in regular Spark, or using StreamingContext.union() for multiple streams.
Finally, if these stateless transformations are insufficient, DStreams provide an
advanced operator called transform() that lets you operate directly on the RDDs
inside them. The transform() operation lets you provide any arbitrary RDD-to-
RDD function to act on the DStream. This function gets called on each batch of data
in the stream to produce a new stream. A common application of transform() is to
reuse batch processing code you had written on RDDs. For example, if you had a
function, extractOutliers() , that acted on an RDD of log lines to produce an RDD
of outliers (perhaps after running some statistics on the messages), you could reuse it
within a transform() , as shown in Examples 10-14 and 10-15 .
Example 10-14. transform() on a DStream in Scala
val outlierDStream = accessLogsDStream . transform { rdd =>
extractOutliers ( rdd )
}
Example 10-15. transform() on a DStream in Java
JavaPairDStream < String , Long > ipRawDStream = accessLogsDStream . transform (
new Function < JavaRDD < ApacheAccessLog >, JavaRDD < ApacheAccessLog >>() {
public JavaPairRDD < ApacheAccessLog > call ( JavaRDD < ApacheAccessLog > rdd ) {
return extractOutliers ( rdd );
}
});
You can also combine and transform data from multiple DStreams together using
StreamingContext.transform or DStream.transformWith(otherStream, func) .
Stateful Transformations
Stateful transformations are operations on DStreams that track data across time; that
is, some data from previous batches is used to generate the results for a new batch.
The two main types are windowed operations, which act over a sliding window of
time periods, and updateStateByKey() , which is used to track state across events for
each key (e.g., to build up an object representing each user session).
Search WWH ::




Custom Search