Database Reference
In-Depth Information
// local machine
val
lines
=
ssc
.
socketTextStream
(
"localhost"
,
7777
)
// Filter our DStream for lines with "error"
val
errorLines
=
lines
.
filter
(
_
.
contains
(
"error"
))
// Print out the lines with errors
errorLines
.
print
()
Example 10-5. Streaming filter for printing lines containing “error” in Java
// Create a StreamingContext with a 1-second batch size from a SparkConf
JavaStreamingContext
jssc
=
new
JavaStreamingContext
(
conf
,
Durations
.
seconds
(
1
));
// Create a DStream from all the input on port 7777
JavaDStream
<
String
>
lines
=
jssc
.
socketTextStream
(
"localhost"
,
7777
);
// Filter our DStream for lines with "error"
JavaDStream
<
String
>
errorLines
=
lines
.
filter
(
new
Function
<
String
,
Boolean
>()
{
public
Boolean
call
(
String
line
)
{
return
line
.
contains
(
"error"
);
}});
// Print out the lines with errors
errorLines
.
print
();
This sets up only the computation that will be done when the system receives data.
To start receiving data, we must explicitly call
start()
on the StreamingContext.
Then, Spark Streaming will start to schedule Spark jobs on the underlying SparkCon‐
text. This will occur in a separate thread, so to keep our application from exiting, we
also need to call
awaitTermination
to wait for the streaming computation to finish.
(See Examples
10-6
and
10-7
.)
Example 10-6. Streaming filter for printing lines containing “error” in Scala
// Start our streaming context and wait for it to "finish"
ssc
.
start
()
// Wait for the job to finish
ssc
.
awaitTermination
()
Example 10-7. Streaming filter for printing lines containing “error” in Java
// Start our streaming context and wait for it to "finish"
jssc
.
start
();
// Wait for the job to finish
jssc
.
awaitTermination
();
Note that a streaming context can be started only once, and must be started after we
set up all the DStreams and output operations we want.
Now that we have our simple streaming application, we can go ahead and run it, as
shown in
Example 10-8
.