Databases Reference
In-Depth Information
nextRead += quantity ;
return ret ;
}
@Override
public boolean isReady () {
return rq . getAvailableToRead ( nextRead ) > 0 ;
}
@Override
public void close () {
rq . close ();
}
}
It is important to mention that among the entire topology there will be only one coordi-
nator instance . When the coordinator is instantiated, it retrieves from Redis a sequence
that tells the coordinator which is the next tweet to read. The first time, this value will
be 1, which means that the next tweet to read is the first one.
The first method that will be called is isReady . It will always be called before initiali
zeTransaction , to make sure the source is ready to be read from. You should return
true or false accordingly. In this example, retrieve the amount of tweets and compare
them with how many tweets you read. The difference between them is the amount to
available tweets to read. If it is greater than 0, it means you have tweets to read.
Finally, the initializeTransaction is executed. As you can see, you get txid and pre
vMetadata as parameters. The first one is a unique transaction ID generated by Storm,
which identifies the batch of tuples to be generated. prevMetadata is the metadata gen-
erated by the coordinator of the previous transaction.
In this example, first make sure how many tweets are available to read. And once you
have sorted that out, create a new TransactionMetadata , indicating which is the first
tweet to read from , and which is the quantity of tweets to read.
As soon as you return the metadata, Storm stores it with the txid in zookeeper. This
guarantees that if something goes wrong, Storm will be able to replay this with the
emitter to resend the batch.
The Emitter
The final step when creating a transactional spout is implementing the emitter.
Let's start with the following implementation:
public static class TweetsTransactionalSpoutEmitter implements
ITransactionalSpout . Emitter < TransactionMetadata > {
RQ rq = new RQ ();
public TweetsTransactionalSpoutEmitter () {
}
 
Search WWH ::




Custom Search