Databases Reference
In-Depth Information
Table 6.2 Equivalent JobConf properties to method calls in SkipBadRecords
SkipBadRecords method
JobConf property
setAttemptsToStartSkipping()
mapred.skip.attempts.to.start.
skipping
setMapperMaxSkipRecords()
mapred.skip.map.max.skip.records
setReducerMaxSkipGroups()
mapred.skip.reduce.max.skip.groups
setSkipOutputPath()
mapred.skip.out.dir
setAutoIncrMapperProcCount()
mapred.skip.map.auto.incr.proc.count
setAutoIncrReducerProcCount()
mapred.skip.reduce.auto.incr.proc.
count
be configured using Streaming's -D property ( -jobconf in version 0.18). Table 6.2
shows the JobConf properties being set by the SkipBadRecords method calls.
We haven't explained the last two properties yet. Their default values are fine for
most Java programs but we need to change them for Streaming ones.
In determining the record range to skip, Hadoop needs an accurate count of the
number of records a task has processed. Hadoop uses an internal counter and by default
it's incremented after each call to the map (reduce) function. For Java programs this is
a good approach to track the number of records processed. It can break down in some
cases, such as programs that process records asynchronously (say, by spawning threads)
or buffer them to process in chunks, but it usually works. In Streaming
programs,
this default behavior wouldn't work at all because there's no equivalent of the map
(reduce) function that gets called to process each record. In those situations you have
to disable the default behavior by setting the Boolean properties to false, and your task
has to update the record counters itself.
In Python, the map task can update the counter with
sys.stderr.write(
"reporter:counter:SkippingTaskCounters,MapProcessedRecords,1\n")
and the reduce task can use
sys.stderr.write(
"reporter:counter:SkippingTaskCounters,ReduceProcessedGroups,1\n")
Java programs that cannot depend on the default record counting should use
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
and
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
when it has processed a key/value pair in its Mapper and Reducer , respectively.
 
Search WWH ::




Custom Search