As an example, in our log processing program from earlier, we could use map() and
reduceByKey() to count log events by IP address in each time step, as shown in
Examples 10-10 and 10-11 .
Example 10-10. map() and reduceByKey() on DStream in Scala
// Assumes ApacheAccessLog is a utility class for parsing entries from Apache logs
val accessLogDStream = logData . map ( line => ApacheAccessLog . parseFromLogLine ( line ))
val ipDStream = accessLogsDStream . map ( entry => ( entry . getIpAddress (), 1 ))
val ipCountsDStream = ipDStream . reduceByKey (( x , y ) => x + y )
Example 10-11. map() and reduceByKey() on DStream in Java
// Assumes ApacheAccessLog is a utility class for parsing entries from Apache logs
static final class IpTuple implements PairFunction < ApacheAccessLog , String , Long > {
public Tuple2 < String , Long > call ( ApacheAccessLog log ) {
return new Tuple2 <>( log . getIpAddress (), 1L );
JavaDStream < ApacheAccessLog > accessLogsDStream =
logData . map ( new ParseFromLogLine ());
JavaPairDStream < String , Long > ipDStream =
accessLogsDStream . mapToPair ( new IpTuple ());
JavaPairDStream < String , Long > ipCountsDStream =
ipDStream . reduceByKey ( new LongSumReducer ());
Stateless transformations can also combine data from multiple DStreams, again
within each time step. For example, key/value DStreams have the same join-related
transformations as RDDs—namely, cogroup() , join() , leftOuterJoin() , and so on
(see “Joins” on page 58 ). We can use these operations on DStreams to perform the
underlying RDD operations separately on each batch.
Let us consider a join between two DStreams. In Examples 10-12 and 10-13 , we have
data keyed by IP address, and we join the request count against the bytes transferred.
Example 10-12. Joining two DStreams in Scala
val ipBytesDStream =
accessLogsDStream . map ( entry => ( entry . getIpAddress (), entry . getContentSize ()))
val ipBytesSumDStream =
ipBytesDStream . reduceByKey (( x , y ) => x + y )
val ipBytesRequestCountDStream =
ipCountsDStream . join ( ipBytesSumDStream )
