Database Reference
In-Depth Information
Figure 19-2. The stages and RDDs in a Spark job for calculating a histogram of word counts
If an RDD has been persisted from a previous job in the same application ( SparkCon-
text ), then the DAG scheduler will save work and not create stages for recomputing it
(or the RDDs it was derived from).
The DAG scheduler is responsible for splitting a stage into tasks for submission to the
task scheduler. In this example, in the first stage one shuffle map task is run for each parti-
tion of the input file. The level of parallelism for a reduceByKey() operation can be
set explicitly by passing it as the second parameter. If not set, it will be determined from
the parent RDD, which in this case is the number of partitions in the input data.
Each task is given a placement preference by the DAG scheduler to allow the task sched-
uler to take advantage of data locality. A task that processes a partition of an input RDD
stored on HDFS, for example, will have a placement preference for the datanode hosting
the partition's block (known as node local ), while a task that processes a partition of an
RDD that is cached in memory will prefer the executor storing the RDD partition ( process
local ).
Going back to Figure 19-1 , once the DAG scheduler has constructed the complete DAG
of stages, it submits each stage's set of tasks to the task scheduler (step 3). Child stages
are only submitted once their parents have completed successfully.
Task Scheduling
When the task scheduler is sent a set of tasks, it uses its list of executors that are running
for the application and constructs a mapping of tasks to executors that takes placement
preferences into account. Next, the task scheduler assigns tasks to executors that have free
cores (this may not be the complete set if another job in the same application is running),
and it continues to assign more tasks as executors finish running tasks, until the task set is
complete. Each task is allocated one core by default, although this can be changed by set-
ting spark.task.cpus .
Note that for a given executor the scheduler will first assign process-local tasks, then
node-local tasks, then rack-local tasks, before assigning an arbitrary (nonlocal) task, or a
speculative task if there are no other candidates. [ 133 ]
Assigned tasks are launched through a scheduler backend (step 4 in Figure 19-1 ) , which
sends a remote launch task message (step 5) to the executor backend to tell the executor to
run the task (step 6).
Search WWH ::




Custom Search