Databases Reference
In-Depth Information
multi . hincrBy ( "users" , user , count );
}
keys = usersHashtags . keySet ();
for ( String key : keys ) {
Long count = usersHashtags . get ( key );
multi . hincrBy ( "users_hashtags" , key , count );
}
multi . exec ();
}
@Override
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
}
}
This is all very straightforward, but there is a very important detail in the finishBatch
method.
...
multi . set ( LAST_COMMITED_TRANSACTION_FIELD , currentTransaction );
...
Here you are storing in your database the last transaction ID committed. Why should
you do that? Remember that if a transaction fails, Storm will be replaying it as many
times as necessary. If you don't make sure that you already processed the transaction,
you could overcount and the whole idea of a transactional topology would be useless.
So remember: store the last transaction ID committed and check against it before com-
mitting .
Partitioned Transactional Spouts
It is very common for a spout to read batches of tuples from a set of partitions. Con-
tinuing the example, you could have several Redis databases and the tweets could be
split across those Redis databases. By implementing IPartitionedTransactionalSp
out , Storm offers some facilities to manage the state for every partition and guarantee
the ability to replay.
Let's see how to modify your previous TweetsTransactionalSpout so it can handle par-
titions.
First, extend BasePartitionedTransactionalSpout , which implements IPartitioned
TransactionalSpout .
public class TweetsPartitionedTransactionalSpout extends
BasePartitionedTransactionalSpout < TransactionMetadata > {
...
}
Tell Storm, which is your coordinator.
 
Search WWH ::




Custom Search