Databases Reference
In-Depth Information
}
public long getNextRead () {
String sNextRead = jedis . get ( NEXT_READ );
if ( sNextRead == null )
return 1 ;
return Long . valueOf ( sNextRead );
}
public long getNextWrite () {
return Long . valueOf ( jedis . get ( NEXT_WRITE ));
}
public void close () {
jedis . disconnect ();
}
public void setNextRead ( long nextRead ) {
jedis . set ( NEXT_READ , "" + nextRead );
}
public List < String > getMessages ( long from , int quantity ) {
String [] keys = new String [ quantity ];
for ( int i = 0 ; i < quantity ; i ++)
keys [ i ] = "" +( i + from );
return jedis . mget ( keys );
}
}
Read carefully the implementation of each method, and make sure you understand
what they do.
The Coordinator
Let's see the implementation of the coordinator of this example.
public static class TweetsTransactionalSpoutCoordinator implements
ITransactionalSpout . Coordinator < TransactionMetadata > {
TransactionMetadata lastTransactionMetadata ;
RQ rq = new RQ ();
long nextRead = 0 ;
public TweetsTransactionalSpoutCoordinator () {
nextRead = rq . getNextRead ();
}
@Override
public TransactionMetadata initializeTransaction ( BigInteger txid ,
TransactionMetadata prevMetadata ) {
long quantity = rq . getAvailableToRead ( nextRead );
quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity ;
TransactionMetadata ret = new TransactionMetadata ( nextRead , ( int ) quantity );
 
Search WWH ::




Custom Search