Databases Reference
In-Depth Information
Let's take a look at an example. Imagine you are processing bank transactions, and you
have the following requirements:
• If a transaction fails, resend the message.
• If the transaction fails too many times, terminate the topology.
You'll create a spout that sends 100 random transaction IDs, and a bolt that fails for
80% of tuples received (you can find the complete example at ch04-spout examples ) .
You'll implement the spout using a Map to emit transaction message tuples so that it's
easy to resend messages.
public void nextTuple () {
if (! toSend . isEmpty ()){
for ( Map . Entry < Integer , String > transactionEntry : toSend . entrySet ()){
Integer transactionId = transactionEntry . getKey ();
String transactionMessage = transactionEntry . getValue ();
collector . emit ( new Values ( transactionMessage ), transactionId );
}
toSend . clear ();
}
}
If there are messages waiting to be sent, get each transaction message and its associated
ID and emit them as a tuple, then clear the message queue. Note that it's safe to call
clear on the map, because nextTuple , fail , and ack are the only methods that modify
the map, and they all run in the same thread.
Maintain two maps to keep track of transaction messages waiting to be sent, and the
number of times each transaction has failed. The ack method simply removes the trans-
action message from each list.
public void ack ( Object msgId ) {
messages . remove ( msgId );
failCounterMessages . remove ( msgId );
}
The fail method decides whether to resend a transaction message or fail if it has failed
too many times.
If you are using an all grouping in your topology and any instance of the
bolt fails, the fail method of the spout will be called as well.
public void fail ( Object msgId ) {
Integer transactionId = ( Integer ) msgId ;
// Check the number of times the transaction has failed
Integer failures = transactionFailureCount . get ( transactionId ) + 1 ;
if ( fails >= MAX_FAILS ){
// If the number of failures is too high, terminate the topology
throw new RuntimeException ( "Error, transaction id [" +
transactionId + "] has had too many errors [" + failures + "]" );
 
Search WWH ::




Custom Search