Database Reference
In-Depth Information
mapf_hierarchical
%
'''new Date(
this._id.d.valueOf()
- dt.getDay()*24*60*60*1000)'''
)
mapf_month
=
bson
.
Code
(
mapf_hierarchical
%
'''new Date(
this._id.d.getFullYear(),
this._id.d.getMonth(),
1, 0, 0, 0, 0)'''
)
mapf_year
=
bson
.
Code
(
mapf_hierarchical
%
'''new Date(
this._id.d.getFullYear(),
1, 1, 0, 0, 0, 0)'''
)
Now, we'll create an
h_aggregate
function to wrap the
map_reduce
operation to reduce code
duplication:
def
def
h_aggregate
(
icollection
,
ocollection
,
mapf
,
cutoff
,
last_run
):
query
=
{
'value.ts'
: {
'$gt'
:
last_run
,
'$lt'
:
cutoff
} }
icollection
.
map_reduce
(
map
=
mapf
,
reduce
=
reducef
,
finalize
=
finalizef
,
query
=
query
,
out
=
{
'reduce'
:
ocollection
.
name
})
With
h_aggregate
defined, we can perform all aggregation operations as follows:
cutoff
=
datetime
.
utcnow
()
-
timedelta
(
seconds
=
60
)
# First step is still special
query
=
{
'ts'
: {
'$gt'
:
last_run
,
'$lt'
:
cutoff
} }
db
.
events
.
map_reduce
(
map
=
mapf_hour
,
reduce
=
reducef
,
finalize
=
finalizef
,
query
=
query
,
out
=
{
'reduce'
:
'stats.hourly'
})
# But the other ones are not
h_aggregate
(
db
.
stats
.
hourly
,
db
.
stats
.
daily
,
mapf_day
,
cutoff
,
last_run
)
h_aggregate
(
db
.
stats
.
daily
,
db
.
stats
.
weekly
,
mapf_week
,
cutoff
,
last_run
)
h_aggregate
(
db
.
stats
.
daily
,
db
.
stats
.
monthly
,
mapf_month
,
cutoff
,
last_run
)
h_aggregate
(
db
.
stats
.
monthly
,
db
.
stats
.
yearly
,
mapf_year
,
cutoff
,
last_run
)