Database Reference
In-Depth Information
reduceByKeyAndWindow (
new AddLongs (), // Adding elements in the new batches entering the window
new SubtractLongs ()
// Removing elements from the oldest batches exiting the window
Durations . seconds ( 30 ), // Window duration
Durations . seconds ( 10 )); // Slide duration
Finally, for counting data, DStreams offer countByWindow() and countByValueAnd
Window() as shorthands. countByWindow() gives us a DStream representing the num‐
ber of elements in each window. countByValueAndWindow() gives us a DStream with
the counts for each value. See Examples 10-21 and 10-22 .
Example 10-21. Windowed count operations in Scala
val ipDStream = accessLogsDStream . map { entry => entry . getIpAddress ()}
val ipAddressRequestCount = ipDStream . countByValueAndWindow ( Seconds ( 30 ), Seconds ( 10 ))
val requestCount = accessLogsDStream . countByWindow ( Seconds ( 30 ), Seconds ( 10 ))
Example 10-22. Windowed count operations in Java
JavaDStream < String > ip = accessLogsDStream . map (
new Function < ApacheAccessLog , String >() {
public String call ( ApacheAccessLog entry ) {
return entry . getIpAddress ();
}});
JavaDStream < Long > requestCount = accessLogsDStream . countByWindow (
Dirations . seconds ( 30 ), Durations . seconds ( 10 ));
JavaPairDStream < String , Long > ipAddressRequestCount = ip . countByValueAndWindow (
Dirations . seconds ( 30 ), Durations . seconds ( 10 ));
UpdateStateByKey transformation
Sometimes it's useful to maintain state across the batches in a DStream (e.g., to track
sessions as users visit a site). updateStateByKey() enables this by providing access to
a state variable for DStreams of key/value pairs. Given a DStream of (key, event)
pairs, it lets you construct a new DStream of (key, state) pairs by taking a function
that specifies how to update the state for each key given new events. For example, in a
web server log, our events might be visits to the site, where the key is the user ID.
Using updateStateByKey() , we could track the last 10 pages each user visited. This
list would be our “state” object, and we'd update it as each event arrives.
To use updateStateByKey() , we provide a function update(events, oldState) that
takes in the events that have arrived for a key and its previous state, and returns a
newState to store for it. This function's signature is as follows:
events is a list of events that arrived in the current batch (may be empty).
Search WWH ::




Custom Search