Database Reference
In-Depth Information
reduceByKeyAndWindow
(
new
AddLongs
(),
// Adding elements in the new batches entering the window
new
SubtractLongs
()
// Removing elements from the oldest batches exiting the window
Durations
.
seconds
(
30
),
// Window duration
Durations
.
seconds
(
10
));
// Slide duration
Finally, for counting data, DStreams offer
countByWindow()
and
countByValueAnd
Window()
as shorthands.
countByWindow()
gives us a DStream representing the num‐
ber of elements in each window.
countByValueAndWindow()
gives us a DStream with
the counts for each value. See Examples
10-21
and
10-22
.
Example 10-21. Windowed count operations in Scala
val
ipDStream
=
accessLogsDStream
.
map
{
entry
=>
entry
.
getIpAddress
()}
val
ipAddressRequestCount
=
ipDStream
.
countByValueAndWindow
(
Seconds
(
30
),
Seconds
(
10
))
val
requestCount
=
accessLogsDStream
.
countByWindow
(
Seconds
(
30
),
Seconds
(
10
))
Example 10-22. Windowed count operations in Java
JavaDStream
<
String
>
ip
=
accessLogsDStream
.
map
(
new
Function
<
ApacheAccessLog
,
String
>()
{
public
String
call
(
ApacheAccessLog
entry
)
{
return
entry
.
getIpAddress
();
}});
JavaDStream
<
Long
>
requestCount
=
accessLogsDStream
.
countByWindow
(
Dirations
.
seconds
(
30
),
Durations
.
seconds
(
10
));
JavaPairDStream
<
String
,
Long
>
ipAddressRequestCount
=
ip
.
countByValueAndWindow
(
Dirations
.
seconds
(
30
),
Durations
.
seconds
(
10
));
UpdateStateByKey transformation
Sometimes it's useful to maintain
state
across the batches in a DStream (e.g., to track
sessions as users visit a site).
updateStateByKey()
enables this by providing access to
a state variable for DStreams of key/value pairs. Given a DStream of (key, event)
pairs, it lets you construct a new DStream of (key, state) pairs by taking a function
that specifies how to update the state for each key given new events. For example, in a
web server log, our events might be visits to the site, where the key is the user ID.
Using
updateStateByKey()
, we could track the last 10 pages each user visited. This
list would be our “state” object, and we'd update it as each event arrives.
To use
updateStateByKey()
, we provide a function
update(events, oldState)
that
takes in the events that have arrived for a key and its previous state, and returns a
newState
to store for it. This function's signature is as follows:
•
events
is a list of events that arrived in the current batch (may be empty).