Databases Reference
In-Depth Information
@Override
public void emitBatch ( TransactionAttempt tx ,
TransactionMetadata coordinatorMeta , BatchOutputCollector collector ) {
rq . setNextRead ( coordinatorMeta . from + coordinatorMeta . quantity );
List < String > messages = rq . getMessages ( coordinatorMeta . from ,
coordinatorMeta . quantity );
long tweetId = coordinatorMeta . from ;
for ( String message : messages ) {
collector . emit ( new Values ( tx , "" + tweetId , message ));
tweetId ++;
}
}
@Override
public void cleanupBefore ( BigInteger txid ) {
}
@Override
public void close () {
rq . close ();
}
}
Emitters are the one who will read the source and send tuples to a stream. It is very
important for the emitters to always be able to send the same batch of tuples for the
same transaction id and transaction metadata . This way, if something goes wrong during
the processing of a batch, Storm will be able to repeat the same transaction id and
transaction metadata with the emitter and make sure the batch of tuples are repeated.
Storm will increase the attempt id in the TransactionAttempt . This way you know that
the batch is repeated.
The important method here is emitBatch . In this method, use the metadata, given as a
parameter, to get tweets from Redis. Also increase the sequence in Redis that keeps
track of how many tweets you've read so far. And of course, emit the tweets to the
topology.
The Bolts
First let's see the standard bolts of this topology:
public class UserSplitterBolt implements IBasicBolt {
private static final long serialVersionUID = 1L ;
@Override
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declareStream ( "users" , new Fields ( "txid" , "tweet_id" , "user" ));
}
@Override
public Map < String , Object > getComponentConfiguration () {
return null ;
 
Search WWH ::




Custom Search