Databases Reference
In-Depth Information
long tweetId = partitionMeta . from ;
for ( String msg : messages ) {
collector . emit ( new Values ( tx , "" + tweetId , msg ));
tweetId ++;
}
}
@Override
public void close () {
}
}
There are two important methods here, emitPartitionBatchNew and emitPartition
Batch . In emitPartitionBatch , you receive from Storm the partition parameter, which
tells you which partition you should retrieve the batch of tuples from. In this method,
decide which tweets to retrieve, generate the corresponding metadata, call emitParti
tionBatch , and return the metadata, which will be stored immediately in Zookeeper.
Storm will send the same transaction ID for every partition, as the transaction exists
across all the partitions. Read from the partition the tweets in the emitPartition
Batch method, and emit the tuples of the batch to the topology. If the batch fails, Storm
will call emitPartitionBatch with the stored metadata to replay the batch.
You can check the code at ch08-transactional topologies on GitHub .
Opaque Transactional Topologies
So far, you might have assumed that it's always possible to replay a batch of tuples for
the same transaction ID. But that might not be feasible in some scenarios. What hap-
pens then?
It turns out that you can still achieve exactly once semantics, but it requires some more
development effort as you will need to keep previous state in case the transaction is
replayed by Storm. Since you can get different tuples for the same transaction ID, when
emitting in different moments in time, you'll need to reset to that previous state and
go from there.
For example, if you are counting total received tweets, you have currently counted five
and in the last transaction, with ID 321, you count eight more. You would keep those
three values— previousCount=5 , currentCount=13 , and lastTransactionId=321 . In case
transaction ID 321 is emitted again and since you get different tuples, you count four
more instead of eight, the committer will detect that is the same transaction ID, it would
reset to the previousCount of five, and will add those new four and update current
Count to nine.
 
Search WWH ::




Custom Search