Database Reference
In-Depth Information
may finish at different times, so the reduce task starts copying their outputs as soon as
each completes. This is known as the copy phase of the reduce task. The reduce task has a
small number of copier threads so that it can fetch map outputs in parallel. The default is
five threads, but this number can be changed by setting the mapre-
duce.reduce.shuffle.parallelcopies property.
NOTE
How do reducers know which machines to fetch map output from?
As map tasks complete successfully, they notify their application master using the heartbeat mechanism.
Therefore, for a given job, the application master knows the mapping between map outputs and hosts. A
thread in the reducer periodically asks the master for map output hosts until it has retrieved them all.
Hosts do not delete map outputs from disk as soon as the first reducer has retrieved them, as the reducer
may subsequently fail. Instead, they wait until they are told to delete them by the application master,
which is after the job has completed.
Map outputs are copied to the reduce task JVM's memory if they are small enough (the
buffer's size is controlled by mapre-
duce.reduce.shuffle.input.buffer.percent , which specifies the propor-
tion of the heap to use for this purpose); otherwise, they are copied to disk. When the in-
memory buffer reaches a threshold size (controlled by mapre-
duce.reduce.shuffle.merge.percent ) or reaches a threshold number of map
outputs ( mapreduce.reduce.merge.inmem.threshold ), it is merged and
spilled to disk. If a combiner is specified, it will be run during the merge to reduce the
amount of data written to disk.
As the copies accumulate on disk, a background thread merges them into larger, sorted
files. This saves some time merging later on. Note that any map outputs that were com-
pressed (by the map task) have to be decompressed in memory in order to perform a
merge on them.
When all the map outputs have been copied, the reduce task moves into the sort phase
(which should properly be called the merge phase, as the sorting was carried out on the
map side), which merges the map outputs, maintaining their sort ordering. This is done in
rounds. For example, if there were 50 map outputs and the merge factor was 10 (the de-
fault, controlled by the mapreduce.task.io.sort.factor property, just like in
the map's merge), there would be five rounds. Each round would merge 10 files into 1, so
at the end there would be 5 intermediate files.
Search WWH ::




Custom Search