Database Reference
In-Depth Information
/*
We compute and print out stats for each batch.
Since each batch is an RDD, we call forEeachRDD on
the DStream, and apply the usual RDD functions
we used in Chapter 1.
*/
events.foreachRDD { (rdd, time) =>
val numPurchases = rdd.count()
val uniqueUsers = rdd.map { case (user, _, _) => user
}.distinct().count()
val totalRevenue = rdd.map { case (_, _, price) =>
price.toDouble }.sum()
val productsByPopularity = rdd
.map { case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)
val formatter = new SimpleDateFormat
val dateStr = formatter.format(new
Date(time.milliseconds))
println(s"== Batch start time: $dateStr ==")
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d
purchases".format(mostPopular._1, mostPopular._2))
}
// start the context
ssc.start()
ssc.awaitTermination()
}
}
Search WWH ::




Custom Search