Databases Reference
In-Depth Information
public static class TweetsPartitionedTransactionalCoordinator implements Coordinator {
@Override
public int numPartitions () {
return 4 ;
}
@Override
public boolean isReady () {
return true ;
}
@Override
public void close () {
}
}
In this case, the coordinator is very simple. In the numPartitions method, tell Storm
how many partitions you have. And also notice that you don't return any metadata. In
an IPartitionedTransactionalSpout , the metadata is managed by the emitter directly.
Let's see how the emitter is implemented.
public static class TweetsPartitionedTransactionalEmitter
implements Emitter < TransactionMetadata > {
PartitionedRQ rq = new PartitionedRQ ();
@Override
public TransactionMetadata emitPartitionBatchNew ( TransactionAttempt tx ,
BatchOutputCollector collector , int partition ,
TransactionMetadata lastPartitionMeta ) {
long nextRead ;
if ( lastPartitionMeta == null )
nextRead = rq . getNextRead ( partition );
else {
nextRead = lastPartitionMeta . from + lastPartitionMeta . quantity ;
rq . setNextRead ( partition , nextRead ); // Move the cursor
}
long quantity = rq . getAvailableToRead ( partition , nextRead );
quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity ;
TransactionMetadata metadata = new TransactionMetadata ( nextRead , ( int ) quantity );
emitPartitionBatch ( tx , collector , partition , metadata );
return metadata ;
}
@Override
public void emitPartitionBatch ( TransactionAttempt tx , BatchOutputCollector
collector ,
int partition , TransactionMetadata partitionMeta ) {
if ( partitionMeta . quantity <= 0 )
return ;
List < String > messages = rq . getMessages ( partition , partitionMeta . from ,
partitionMeta . quantity );
 
Search WWH ::




Custom Search