Database Reference
In-Depth Information
As an example, in our log processing program from earlier, we could use map() and
reduceByKey() to count log events by IP address in each time step, as shown in
Examples 10-10 and 10-11 .
Example 10-10. map() and reduceByKey() on DStream in Scala
// Assumes ApacheAccessLog is a utility class for parsing entries from Apache logs
val accessLogDStream = logData . map ( line => ApacheAccessLog . parseFromLogLine ( line ))
val ipDStream = accessLogsDStream . map ( entry => ( entry . getIpAddress (), 1 ))
val ipCountsDStream = ipDStream . reduceByKey (( x , y ) => x + y )
Example 10-11. map() and reduceByKey() on DStream in Java
// Assumes ApacheAccessLog is a utility class for parsing entries from Apache logs
static final class IpTuple implements PairFunction < ApacheAccessLog , String , Long > {
public Tuple2 < String , Long > call ( ApacheAccessLog log ) {
return new Tuple2 <>( log . getIpAddress (), 1L );
}
}
JavaDStream < ApacheAccessLog > accessLogsDStream =
logData . map ( new ParseFromLogLine ());
JavaPairDStream < String , Long > ipDStream =
accessLogsDStream . mapToPair ( new IpTuple ());
JavaPairDStream < String , Long > ipCountsDStream =
ipDStream . reduceByKey ( new LongSumReducer ());
Stateless transformations can also combine data from multiple DStreams, again
within each time step. For example, key/value DStreams have the same join-related
transformations as RDDs—namely, cogroup() , join() , leftOuterJoin() , and so on
(see “Joins” on page 58 ). We can use these operations on DStreams to perform the
underlying RDD operations separately on each batch.
Let us consider a join between two DStreams. In Examples 10-12 and 10-13 , we have
data keyed by IP address, and we join the request count against the bytes transferred.
Example 10-12. Joining two DStreams in Scala
val ipBytesDStream =
accessLogsDStream . map ( entry => ( entry . getIpAddress (), entry . getContentSize ()))
val ipBytesSumDStream =
ipBytesDStream . reduceByKey (( x , y ) => x + y )
val ipBytesRequestCountDStream =
ipCountsDStream . join ( ipBytesSumDStream )
Search WWH ::




Custom Search