Database Reference
In-Depth Information
As an example, in our log processing program from earlier, we could use
map()
and
reduceByKey()
to count log events by IP address in each time step, as shown in
Examples
10-10
and
10-11
.
Example 10-10. map() and reduceByKey() on DStream in Scala
// Assumes ApacheAccessLog is a utility class for parsing entries from Apache logs
val
accessLogDStream
=
logData
.
map
(
line
=>
ApacheAccessLog
.
parseFromLogLine
(
line
))
val
ipDStream
=
accessLogsDStream
.
map
(
entry
=>
(
entry
.
getIpAddress
(),
1
))
val
ipCountsDStream
=
ipDStream
.
reduceByKey
((
x
,
y
)
=>
x
+
y
)
Example 10-11. map() and reduceByKey() on DStream in Java
// Assumes ApacheAccessLog is a utility class for parsing entries from Apache logs
static
final
class
IpTuple
implements
PairFunction
<
ApacheAccessLog
,
String
,
Long
>
{
public
Tuple2
<
String
,
Long
>
call
(
ApacheAccessLog
log
)
{
return
new
Tuple2
<>(
log
.
getIpAddress
(),
1L
);
}
}
JavaDStream
<
ApacheAccessLog
>
accessLogsDStream
=
logData
.
map
(
new
ParseFromLogLine
());
JavaPairDStream
<
String
,
Long
>
ipDStream
=
accessLogsDStream
.
mapToPair
(
new
IpTuple
());
JavaPairDStream
<
String
,
Long
>
ipCountsDStream
=
ipDStream
.
reduceByKey
(
new
LongSumReducer
());
Stateless transformations can also combine data from multiple DStreams, again
within each time step. For example, key/value DStreams have the same join-related
transformations as RDDs—namely,
cogroup()
,
join()
,
leftOuterJoin()
, and so on
(see
“Joins” on page 58
). We can use these operations on DStreams to perform the
underlying RDD operations separately on each batch.
Let us consider a join between two DStreams. In Examples
10-12
and
10-13
, we have
data keyed by IP address, and we join the request count against the bytes transferred.
Example 10-12. Joining two DStreams in Scala
val
ipBytesDStream
=
accessLogsDStream
.
map
(
entry
=>
(
entry
.
getIpAddress
(),
entry
.
getContentSize
()))
val
ipBytesSumDStream
=
ipBytesDStream
.
reduceByKey
((
x
,
y
)
=>
x
+
y
)
val
ipBytesRequestCountDStream
=
ipCountsDStream
.
join
(
ipBytesSumDStream
)