Database Reference
In-Depth Information
Building a Multistep Pipeline
Although some large data processing tasks can be accomplished using a single
MapReduce step, a common pattern is to run a job that takes multiple steps. In other
cases, two separate sources will use two mapper functions to emit data containing the
same key. Then a single reducer phase will combine the data based on the matching
key into a combined output.
To illustrate this type of pipeline, let's extend our original birth count example by
adding another step. Let's count only the female births that happened per month in 2010.
Before determining the number of births per month, let's filter each record by gender.
To do this, let's add a new mapper function called filter_births_by_gender . This
mapper will emit a value only if the birth record is female. The key emitted will sim-
ply be an F, and the value will be the month and year of the birth. The output of this
mapper will be fed into another mapper function called counter_mapper , which
will assign a 1 to the month-year key ( just as we did in the single-step example).
Finally, the output from this mapper function will be fed into our original reducer,
sum_births , which will emit the total number of female births per month.
To specify the order in which we want the mappers to run, we will overload the
MrJob.steps method to return an array containing each individual map and reduce
phase in order. See Listing 8.11 for the code for each of these steps.
Listing 8.11 A two-step MapReduce mrjob script
mrjob_multistep_example.py
from mrjob.job import MRJob
class MRFemaleBirthCounter(MRJob):
def filter_births_by_gender(self, key, record):
if record[435] == 'F':
year = record[14:18]
month = record[18:20]
birth_month = '%s-%s' % (month, year)
yield 'Female', birth_month
def counter_mapper(self, gender, month):
yield '%s %s' % (gender,month), 1
def sum_births(self, month, births):
yield month, sum(births)
def steps(self):
return [self.mr(mapper=self.filter_births_by_gender),
self.mr(mapper=self.counter_mapper,
reducer=self.sum_births)]
 
Search WWH ::




Custom Search