Databases Reference
In-Depth Information
if
(
count
==
null
)
count
=
0
;
count
++;
hashtagsCounter
.
put
(
hashtag
,
count
);
}
}
}
for
(
String
hashtag
:
hashtagsCounter
.
keySet
())
{
int
count
=
hashtagsCounter
.
get
(
hashtag
);
collector
.
emit
(
new
Values
(
id
,
user
,
hashtag
,
count
));
}
}
}
In this method, generate and emit a tuple for each user-hashtag, and the amount of
times it occurred.
You can see the complete implementation in the downloadable code
available on Git-
Hub
.
The Committer Bolts
As you've learned, batches of tuples are sent by the coordinator and emitters across the
topology. Those batched of tuples are processed in parallel without any specific order.
The
coordinator bolts
are special batch bolts that implement
ICommitter
or have been
set with
setCommiterBolt
in the
TransactionalTopologyBuilder
. The main difference
with regular batch bolts is that the
finishBatch
method of committer bolts executes
when the batch is ready to be committed. This happens when all previous transactions
have been committed successfully. Additionally,
finishBatch
method is executed se-
quentially. So if the batch with transaction ID 1 and the batch with transaction ID 2
are being processed in parallel in the topology, the
finishBatch
method of the com-
mitter bolt that is processing the batch with transaction ID 2 will get executed only
when the
finishBatch
of batch with transaction ID 1 has finished without any errors.
The implementation of this class follows:
public
class
RedisCommiterCommiterBolt
extends
BaseTransactionalBolt
implements
ICommitter
{
public
static
final
String
LAST_COMMITED_TRANSACTION_FIELD
=
"LAST_COMMIT"
;
TransactionAttempt
id
;
BatchOutputCollector
collector
;
Jedis
jedis
;
@Override
public
void
prepare
(
Map
conf
,
TopologyContext
context
,
BatchOutputCollector
collector
,
TransactionAttempt
id
)
{
this
.
id
=
id
;
this
.
collector
=
collector
;