Databases Reference
In-Depth Information
long from ;
int quantity ;
public TransactionMetadata ( long from , int quantity ) {
this . from = from ;
this . quantity = quantity ;
}
}
Here you'll store from and quantity , which will tell you exactly how to generate the
batch of tuples.
To finish the implementation of the spout, you need to implement the following three
methods:
@Override
public ITransactionalSpout . Coordinator < TransactionMetadata > getCoordinator (
Map conf , TopologyContext context ) {
return new TweetsTransactionalSpoutCoordinator ();
}
@Override
public backtype . storm . transactional . ITransactionalSpout . Emitter < TransactionMetadata >
getEmitter (
Map conf , TopologyContext context ) {
return new TweetsTransactionalSpoutEmitter ();
}
@Override
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declare ( new Fields ( "txid" , "tweet_id" , "tweet" ));
}
In the getCoordinator method, you tell Storm which class will coordinate the generation
of batches of tuples. With getEmitter , you tell Storm which class will be responsible
for reading batches of tuples from the source and emitting them to a stream in the
topology. And finally, as you did before, you need to declare which fields are emitted.
The RQ class
To make the example easier, we've decided to encapsulate all operations with Redis in
one single class.
public class RQ {
public static final String NEXT_READ = "NEXT_READ" ;
public static final String NEXT_WRITE = "NEXT_WRITE" ;
Jedis jedis ;
public RQ () {
jedis = new Jedis ( "localhost" );
}
public long getAvailableToRead ( long current ) {
return getNextWrite () - current ;
 
Search WWH ::




Custom Search