Database Reference
In-Depth Information
val model2 = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(zeroVector.data))
.setNumIterations(1)
.setStepSize(1.0)
// create a stream of labeled points
val labeledStream = stream.map { event =>
val split = event.split("\t")
val y = split(0).toDouble
val features = split(1).split(",").map(_.toDouble)
LabeledPoint(label = y, features =
Vectors.dense(features))
}
Note that most of the preceding setup code is the same as our simple streaming model ex-
ample. However, we created two instances of StreamingLinearRegres-
sionWithSGD : one with a learning rate of 0.01 and one with the learning rate set to
1.0 .
Next, we will train each model on our input stream, and using Spark Streaming's trans-
form function, we will create a new DStream that contains the error rates for each model:
// train both models on the same stream
model1.trainOn(labeledStream)
model2.trainOn(labeledStream)
// use transform to create a stream with model error
rates
val predsAndTrue = labeledStream.transform { rdd =>
val latest1 = model1.latestModel()
val latest2 = model2.latestModel()
rdd.map { point =>
val pred1 = latest1.predict(point.features)
val pred2 = latest2.predict(point.features)
(pred1 - point.label, pred2 - point.label)
}
}
Search WWH ::




Custom Search