Database Reference
In-Depth Information
Creating a streaming regression model
In the next step in our example, we will create a streaming regression program. The basic
layout and setup is the same as our previous streaming analytics examples:
/**
* A simple streaming linear regression that prints out
predicted value for each batch
*/
object SimpleStreamingModel {
def main(args: Array[String]) {
val ssc = new StreamingContext("local[2]", "First
Streaming App", Seconds(10))
val stream = ssc.socketTextStream("localhost", 9999)
Here, we will set up the number of features to match the records in our input data stream.
We will then create a zero vector to use as the initial weight vector of our streaming re-
gression model. Finally, we will select the number of iterations and step size:
val NumFeatures = 100
val zeroVector = DenseVector.zeros[Double](NumFeatures)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(zeroVector.data))
.setNumIterations(1)
.setStepSize(0.01)
Next, we will again use the map function to transform the input DStream, where each re-
cord is a string representation of our input data, into a LabeledPoint instance that con-
tains the target value and feature vector:
// create a stream of labeled points
val labeledStream = stream.map { event =>
val split = event.split("\t")
val y = split(0).toDouble
val features = split(1).split(",").map(_.toDouble)
LabeledPoint(label = y, features =
Search WWH ::




Custom Search