Databases Reference
In-Depth Information
Table 6.2
Equivalent
JobConf
properties to method calls in
SkipBadRecords
SkipBadRecords
method
JobConf
property
setAttemptsToStartSkipping()
mapred.skip.attempts.to.start.
skipping
setMapperMaxSkipRecords()
mapred.skip.map.max.skip.records
setReducerMaxSkipGroups()
mapred.skip.reduce.max.skip.groups
setSkipOutputPath()
mapred.skip.out.dir
setAutoIncrMapperProcCount()
mapred.skip.map.auto.incr.proc.count
setAutoIncrReducerProcCount()
mapred.skip.reduce.auto.incr.proc.
count
shows the
JobConf
properties being set by the
SkipBadRecords
method calls.
We haven't explained the last two properties yet. Their default values are fine for
most Java programs but we need to change them for Streaming ones.
In determining the record range to skip, Hadoop needs an accurate count of the
number of records a task has processed. Hadoop uses an internal counter and by default
it's incremented after each call to the map (reduce) function. For Java programs this is
a good approach to track the number of records processed. It can break down in some
cases, such as programs that process records asynchronously (say, by spawning threads)
or buffer them to process in chunks, but it usually works. In Streaming
programs,
this default behavior wouldn't work at all because there's no equivalent of the map
(reduce) function that gets called to process each record. In those situations you have
to disable the default behavior by setting the Boolean properties to false, and your task
has to update the record counters itself.
In Python, the map task can update the counter with
sys.stderr.write(
➥
"reporter:counter:SkippingTaskCounters,MapProcessedRecords,1\n")
and the reduce task can use
sys.stderr.write(
➥
"reporter:counter:SkippingTaskCounters,ReduceProcessedGroups,1\n")
Java programs that cannot depend on the default record counting should use
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
and
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
when it has processed a key/value pair in its
Mapper
and
Reducer
, respectively.