Database Reference
In-Depth Information
Example 10-13. Joining two DStreams in Java
JavaPairDStream
<
String
,
Long
>
ipBytesDStream
=
accessLogsDStream
.
mapToPair
(
new
IpContentTuple
());
JavaPairDStream
<
String
,
Long
>
ipBytesSumDStream
=
ipBytesDStream
.
reduceByKey
(
new
LongSumReducer
());
JavaPairDStream
<
String
,
Tuple2
<
Long
,
Long
>>
ipBytesRequestCountDStream
=
ipCountsDStream
.
join
(
ipBytesSumDStream
);
We can also merge the contents of two different DStreams using the
union()
opera‐
tor as in regular Spark, or using
StreamingContext.union()
for multiple streams.
Finally, if these stateless transformations are insufficient, DStreams provide an
advanced operator called
transform()
that lets you operate directly on the RDDs
inside them. The
transform()
operation lets you provide any arbitrary RDD-to-
RDD function to act on the DStream. This function gets called on each batch of data
in the stream to produce a new stream. A common application of
transform()
is to
reuse batch processing code you had written on RDDs. For example, if you had a
function,
extractOutliers()
, that acted on an RDD of log lines to produce an RDD
of outliers (perhaps after running some statistics on the messages), you could reuse it
within a
transform()
, as shown in Examples
10-14
and
10-15
.
Example 10-14. transform() on a DStream in Scala
val
outlierDStream
=
accessLogsDStream
.
transform
{
rdd
=>
extractOutliers
(
rdd
)
}
Example 10-15. transform() on a DStream in Java
JavaPairDStream
<
String
,
Long
>
ipRawDStream
=
accessLogsDStream
.
transform
(
new
Function
<
JavaRDD
<
ApacheAccessLog
>,
JavaRDD
<
ApacheAccessLog
>>()
{
public
JavaPairRDD
<
ApacheAccessLog
>
call
(
JavaRDD
<
ApacheAccessLog
>
rdd
)
{
return
extractOutliers
(
rdd
);
}
});
You can also combine and transform data from multiple DStreams together using
StreamingContext.transform
or
DStream.transformWith(otherStream, func)
.
Stateful Transformations
Stateful transformations are operations on DStreams that track data across time; that
is, some data from previous batches is used to generate the results for a new batch.
The two main types are windowed operations, which act over a sliding window of
time periods, and
updateStateByKey()
, which is used to track state across events for
each key (e.g., to build up an object representing each user session).