Databases Reference
In-Depth Information
Also, every transaction that is being processed in parallel will be cancelled when a
previous transaction in cancelled. This is to ensure that you don't miss anything in the
middle.
Your spout should implement IOpaquePartitionedTransactionalSpout and as you can
see, the coordinator and emitters are very simple.
public static class TweetsOpaquePartitionedTransactionalSpoutCoordinator implements
IOpaquePartitionedTransactionalSpout . Coordinator {
@Override
public boolean isReady () {
return true ;
}
}
public static class TweetsOpaquePartitionedTransactionalSpoutEmitter implements
IOpaquePartitionedTransactionalSpout . Emitter < TransactionMetadata > {
PartitionedRQ rq = new PartitionedRQ ();
@Override
public TransactionMetadata emitPartitionBatch ( TransactionAttempt tx ,
BatchOutputCollector collector , int partition ,
TransactionMetadata lastPartitionMeta ) {
long nextRead ;
if ( lastPartitionMeta == null )
nextRead = rq . getNextRead ( partition );
else {
nextRead = lastPartitionMeta . from + lastPartitionMeta . quantity ;
rq . setNextRead ( partition , nextRead ); // Move the cursor
}
long quantity = rq . getAvailableToRead ( partition , nextRead );
quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity ;
TransactionMetadata metadata = new TransactionMetadata ( nextRead , ( int ) quantity );
emitMessages ( tx , collector , partition , metadata );
return metadata ;
}
private void emitMessages ( TransactionAttempt tx , BatchOutputCollector collector ,
int partition , TransactionMetadata partitionMeta ) {
if ( partitionMeta . quantity <= 0 )
return ;
List < String > messages =
rq . getMessages ( partition , partitionMeta . from , partitionMeta . quantity );
long tweetId = partitionMeta . from ;
for ( String msg : messages ) {
collector . emit ( new Values ( tx , "" + tweetId , msg ));
tweetId ++;
}
}
@Override
 
Search WWH ::




Custom Search