Databases Reference
In-Depth Information
Let's take a look at an example. Imagine you are processing bank transactions, and you
have the following requirements:
• If a transaction fails, resend the message.
• If the transaction fails too many times, terminate the topology.
You'll create a spout that sends 100 random transaction IDs, and a bolt that fails for
80% of tuples received (you can find the complete example at
ch04-spout examples
)
.
You'll implement the spout using a
Map
to emit transaction message tuples so that it's
easy to resend messages.
public
void
nextTuple
()
{
if
(!
toSend
.
isEmpty
()){
for
(
Map
.
Entry
<
Integer
,
String
>
transactionEntry
:
toSend
.
entrySet
()){
Integer
transactionId
=
transactionEntry
.
getKey
();
String
transactionMessage
=
transactionEntry
.
getValue
();
collector
.
emit
(
new
Values
(
transactionMessage
),
transactionId
);
}
toSend
.
clear
();
}
}
If there are messages waiting to be sent, get each transaction message and its associated
ID and emit them as a tuple, then clear the message queue. Note that it's safe to call
clear
on the map, because
nextTuple
,
fail
, and
ack
are the only methods that modify
the map, and they all run in the same thread.
Maintain two maps to keep track of transaction messages waiting to be sent, and the
number of times each transaction has failed. The
ack
method simply removes the trans-
action message from each list.
public
void
ack
(
Object
msgId
)
{
messages
.
remove
(
msgId
);
failCounterMessages
.
remove
(
msgId
);
}
The
fail
method decides whether to resend a transaction message or fail if it has failed
too many times.
If you are using an all grouping in your topology and any instance of the
bolt fails, the
fail
method of the spout will be called as well.
public
void
fail
(
Object
msgId
)
{
Integer
transactionId
=
(
Integer
)
msgId
;
// Check the number of times the transaction has failed
Integer
failures
=
transactionFailureCount
.
get
(
transactionId
)
+
1
;
if
(
fails
>=
MAX_FAILS
){
// If the number of failures is too high, terminate the topology
throw
new
RuntimeException
(
"Error, transaction id ["
+
transactionId
+
"] has had too many errors ["
+
failures
+
"]"
);