Databases Reference
In-Depth Information
long
tweetId
=
partitionMeta
.
from
;
for
(
String
msg
:
messages
)
{
collector
.
emit
(
new
Values
(
tx
,
""
+
tweetId
,
msg
));
tweetId
++;
}
}
@Override
public
void
close
()
{
}
}
There are two important methods here,
emitPartitionBatchNew
and
emitPartition
Batch
. In
emitPartitionBatch
, you receive from Storm the
partition
parameter, which
tells you which partition you should retrieve the batch of tuples from. In this method,
decide which tweets to retrieve, generate the corresponding metadata, call
emitParti
tionBatch
, and return the metadata, which will be stored immediately in Zookeeper.
Storm will send the same transaction ID for every partition, as the transaction exists
across all the partitions. Read from the partition the tweets in the
emitPartition
Batch
method, and emit the tuples of the batch to the topology. If the batch fails, Storm
will call
emitPartitionBatch
with the stored metadata to replay the batch.
You can check the code at
ch08-transactional topologies on GitHub
.
Opaque Transactional Topologies
So far, you might have assumed that it's always possible to replay a batch of tuples for
the same transaction ID. But that might not be feasible in some scenarios. What hap-
pens then?
It turns out that you can still achieve exactly once semantics, but it requires some more
development effort as you will need to keep previous state in case the transaction is
replayed by Storm. Since you can get different tuples for the same transaction ID, when
emitting in different moments in time, you'll need to reset to that previous state and
go from there.
For example, if you are counting total received tweets, you have currently counted five
and in the last transaction, with ID 321, you count eight more. You would keep those
three values—
previousCount=5
,
currentCount=13
, and
lastTransactionId=321
. In case
transaction ID 321 is emitted again and since you get different tuples, you count four
more instead of eight, the committer will detect that is the same transaction ID, it would
reset to the
previousCount
of five, and will add those new four and update
current
Count
to nine.