Database Reference
In-Depth Information
// create stream of events from raw text elements
val events = stream.map { record =>
val event = record.split(",")
(event(0), event(1), event(2).toDouble)
}
val users = events.map{ case (user, product, price) =>
(user, (product, price)) }
val revenuePerUser = users.updateStateByKey(updateState)
revenuePerUser.print()
// start the context
ssc.start()
ssc.awaitTermination()
}
}
After applying the same string split transformation we used in our previous example, we
called updateStateByKey on our DStream, passing in our defined updateState
function. We then printed the results to the console.
Start the streaming example using sbt run and by selecting [4] Streamin-
gStateApp (also restart the producer program if necessary).
After around 10 seconds, you will start to see the first set of state output. We will wait an-
other 10 seconds to see the next set of output. You will see the overall global state being
updated:
...
-------------------------------------------
Time: 1416080440000 ms
-------------------------------------------
(Janet,(2,10.98))
(Frank,(1,5.49))
(James,(2,12.98))
(Malinda,(1,9.99))
(Elaine,(3,29.97))
(Gary,(2,12.98))
Search WWH ::




Custom Search