Database Reference
In-Depth Information
mapf_hierarchical % '''new Date(
this._id.d.valueOf()
- dt.getDay()*24*60*60*1000)''' )
mapf_month = bson . Code (
mapf_hierarchical % '''new Date(
this._id.d.getFullYear(),
this._id.d.getMonth(),
1, 0, 0, 0, 0)''' )
mapf_year = bson . Code (
mapf_hierarchical % '''new Date(
this._id.d.getFullYear(),
1, 1, 0, 0, 0, 0)''' )
Now, we'll create an h_aggregate function to wrap the map_reduce operation to reduce code
duplication:
def
def h_aggregate ( icollection , ocollection , mapf , cutoff , last_run ):
query = { 'value.ts' : { '$gt' : last_run , '$lt' : cutoff } }
icollection . map_reduce (
map = mapf ,
reduce = reducef ,
finalize = finalizef ,
query = query ,
out = { 'reduce' : ocollection . name })
With h_aggregate defined, we can perform all aggregation operations as follows:
cutoff = datetime . utcnow () - timedelta ( seconds = 60 )
# First step is still special
query = { 'ts' : { '$gt' : last_run , '$lt' : cutoff } }
db . events . map_reduce (
map = mapf_hour , reduce = reducef ,
finalize = finalizef , query = query ,
out = { 'reduce' : 'stats.hourly' })
# But the other ones are not
h_aggregate ( db . stats . hourly , db . stats . daily , mapf_day , cutoff , last_run )
h_aggregate ( db . stats . daily , db . stats . weekly , mapf_week , cutoff , last_run )
h_aggregate ( db . stats . daily , db . stats . monthly , mapf_month , cutoff , last_run )
h_aggregate ( db . stats . monthly , db . stats . yearly , mapf_year , cutoff , last_run )
Search WWH ::




Custom Search