Database Reference
In-Depth Information
Stateful streaming
As a final example, we will apply the concept of stateful streaming using the up-
dateStateByKey function to compute a global state of revenue and number of pur-
chases per user, which will be updated with new data from each 10-second batch. Our
StreamingStateApp app is shown here:
object StreamingStateApp {
import org.apache.spark.streaming.StreamingContext._
We will first define an updateState function that will compute the new state from the
running state value and the new data in the current batch. Our state, in this case, is a tuple
of (number of products, revenue) pairs, which we will keep for each user. We
will compute the new state given the set of (product, revenue) pairs for the current
batch and the accumulated state at the current time.
Notice that we will deal with an Option value for the current state, as it might be empty
(which will be the case for the first batch), and we need to define a default value, which we
will do using getOrElse as shown here:
def updateState(prices: Seq[(String, Double)],
currentTotal: Option[(Int, Double)]) = {
val currentRevenue = prices.map(_._2).sum
val currentNumberPurchases = prices.size
val state = currentTotal.getOrElse((0, 0.0))
Some((currentNumberPurchases + state._1, currentRevenue
+ state._2))
}
def main(args: Array[String]) {
val ssc = new StreamingContext("local[2]", "First
Streaming App", Seconds(10))
// for stateful operations, we need to set a checkpoint
// location
ssc.checkpoint("/tmp/sparkstreaming/")
val stream = ssc.socketTextStream("localhost", 9999)
Search WWH ::




Custom Search