Database Reference
In-Depth Information
{
'$inc'
: {
'balance'
:
amt
},
'$push'
: {
'txns'
:
txn
[
'_id'
] } })
return
return
txn
There are two key insights applied here:
▪ The source and destination accounts store a list of pending transactions. This allows us to
track, in the account document, whether a particular transaction ID is pending.
▪ The transaction itself must complete during a certain time window. If it does not, a period-
ic process will roll outstanding transactions back or commit them based on the last state of
the transaction. This handles cases where the application or database crashes in the middle
of a transaction.
Here's our function to actually commit the transfer:
def
def
commit_transfer
(
txn
,
max_txn_time
):
# Mark the transaction as committed
now
=
datetime
.
utcnow
()
cutoff
=
now
-
max_txn_time
result
=
db
.
transaction
.
update
(
{
'_id'
:
txnid
,
'state'
:
'new'
,
'ts'
: {
'$gt'
:
cutoff
} },
{
'$set'
: {
'state'
:
'commit'
} })
iif
not
not
result
[
'updatedExisting'
]:
raise
raise
TransactionError
(
txn
[
'_id'
])
else
else
:
retire_transaction
(
txn
[
'_id'
])
The main purpose of this function is to perform the atomic update of transaction state from
new
to
commit
. If this update succeeds, the transaction will eventually be retired, even if a
crash occurs immediately after the update. To actually retire the transaction, then, we use the
following function:
def
def
retire_transaction
(
txn_id
):
db
.
accounts
.
update
(
{
'_id'
:
txn
[
'src'
],
'txns._id'
:
txn_id
},
{
'$pull'
: {
'txns'
:
txn_id
} })
db
.
accounts
.
update
(
{
'_id'
:
txn
[
'dst'
],
'txns._id'
:
txn
[
'_id'
] },
{
'$pull'
: {
'txns'
:
txn_id
} })
db
.
transaction
.
remove
({
'_id'
:
txn_id
})