Databases Reference
In-Depth Information
if ( count == null )
count = 0 ;
count ++;
hashtagsCounter . put ( hashtag , count );
}
}
}
for ( String hashtag : hashtagsCounter . keySet ()) {
int count = hashtagsCounter . get ( hashtag );
collector . emit ( new Values ( id , user , hashtag , count ));
}
}
}
In this method, generate and emit a tuple for each user-hashtag, and the amount of
times it occurred.
You can see the complete implementation in the downloadable code available on Git-
Hub .
The Committer Bolts
As you've learned, batches of tuples are sent by the coordinator and emitters across the
topology. Those batched of tuples are processed in parallel without any specific order.
The coordinator bolts are special batch bolts that implement ICommitter or have been
set with setCommiterBolt in the TransactionalTopologyBuilder . The main difference
with regular batch bolts is that the finishBatch method of committer bolts executes
when the batch is ready to be committed. This happens when all previous transactions
have been committed successfully. Additionally, finishBatch method is executed se-
quentially. So if the batch with transaction ID 1 and the batch with transaction ID 2
are being processed in parallel in the topology, the finishBatch method of the com-
mitter bolt that is processing the batch with transaction ID 2 will get executed only
when the finishBatch of batch with transaction ID 1 has finished without any errors.
The implementation of this class follows:
public class RedisCommiterCommiterBolt extends BaseTransactionalBolt
implements ICommitter {
public static final String LAST_COMMITED_TRANSACTION_FIELD = "LAST_COMMIT" ;
TransactionAttempt id ;
BatchOutputCollector collector ;
Jedis jedis ;
@Override
public void prepare ( Map conf , TopologyContext context ,
BatchOutputCollector collector , TransactionAttempt id ) {
this . id = id ;
this . collector = collector ;
 
Search WWH ::




Custom Search