Database Reference
In-Depth Information
filesystem path for checkpointing. But in any production setting, you should use a
replicated system such as HDFS, S3, or an NFS filer.
Driver Fault Tolerance
Tolerating failures of the driver node requires a special way of creating our Stream‐
ingContext, which takes in the checkpoint directory. Instead of simply calling
new
StreamingContext
, we need to use the
StreamingContext.getOrCreate()
function.
From our initial example we would change our code as shown in Examples
10-43
and
10-44
.
Example 10-43. Setting up a driver that can recover from failure in Scala
def
createStreamingContext
()
=
{
...
val
sc
=
new
SparkContext
(
conf
)
// Create a StreamingContext with a 1 second batch size
val
ssc
=
new
StreamingContext
(
sc
,
Seconds
(
1
))
ssc
.
checkpoint
(
checkpointDir
)
}
...
val
ssc
=
StreamingContext
.
getOrCreate
(
checkpointDir
,
createStreamingContext
_
)
Example 10-44. Setting up a driver that can recover from failure in Java
JavaStreamingContextFactory
fact
=
new
JavaStreamingContextFactory
()
{
public
JavaStreamingContext
call
()
{
...
JavaSparkContext
sc
=
new
JavaSparkContext
(
conf
);
// Create a StreamingContext with a 1 second batch size
JavaStreamingContext
jssc
=
new
JavaStreamingContext
(
sc
,
Durations
.
seconds
(
1
));
jssc
.
checkpoint
(
checkpointDir
);
return
jssc
;
}};
JavaStreamingContext
jssc
=
JavaStreamingContext
.
getOrCreate
(
checkpointDir
,
fact
);
When this code is run the first time, assuming that the checkpoint directory does not
yet exist, the StreamingContext will be created when you call the factory function
(
createStreamingContext()
for Scala, and
JavaStreamingContextFactory()
for
Java). In the factory, you should set the checkpoint directory. After the driver fails, if
you restart it and run this code again,
getOrCreate()
will reinitialize a Streaming‐
Context from the checkpoint directory and resume processing.
In addition to writing your initialization code using
getOrCreate()
, you will need to
actually restart your driver program when it crashes. On most cluster managers,
Spark does not automatically relaunch the driver if it crashes, so you need to monitor
it using a tool like
monit
and restart it. The best way to do this is probably specific to