Database Reference
In-Depth Information
filesystem path for checkpointing. But in any production setting, you should use a
replicated system such as HDFS, S3, or an NFS filer.
Driver Fault Tolerance
Tolerating failures of the driver node requires a special way of creating our Stream‐
ingContext, which takes in the checkpoint directory. Instead of simply calling new
StreamingContext , we need to use the StreamingContext.getOrCreate() function.
From our initial example we would change our code as shown in Examples 10-43 and
10-44 .
Example 10-43. Setting up a driver that can recover from failure in Scala
def createStreamingContext () = {
...
val sc = new SparkContext ( conf )
// Create a StreamingContext with a 1 second batch size
val ssc = new StreamingContext ( sc , Seconds ( 1 ))
ssc . checkpoint ( checkpointDir )
}
...
val ssc = StreamingContext . getOrCreate ( checkpointDir , createStreamingContext _ )
Example 10-44. Setting up a driver that can recover from failure in Java
JavaStreamingContextFactory fact = new JavaStreamingContextFactory () {
public JavaStreamingContext call () {
...
JavaSparkContext sc = new JavaSparkContext ( conf );
// Create a StreamingContext with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext ( sc , Durations . seconds ( 1 ));
jssc . checkpoint ( checkpointDir );
return jssc ;
}};
JavaStreamingContext jssc = JavaStreamingContext . getOrCreate ( checkpointDir , fact );
When this code is run the first time, assuming that the checkpoint directory does not
yet exist, the StreamingContext will be created when you call the factory function
( createStreamingContext() for Scala, and JavaStreamingContextFactory() for
Java). In the factory, you should set the checkpoint directory. After the driver fails, if
you restart it and run this code again, getOrCreate() will reinitialize a Streaming‐
Context from the checkpoint directory and resume processing.
In addition to writing your initialization code using getOrCreate() , you will need to
actually restart your driver program when it crashes. On most cluster managers,
Spark does not automatically relaunch the driver if it crashes, so you need to monitor
it using a tool like monit and restart it. The best way to do this is probably specific to
Search WWH ::




Custom Search