Databases Reference
In-Depth Information
public
static
class
TweetsPartitionedTransactionalCoordinator
implements
Coordinator
{
@Override
public
int
numPartitions
()
{
return
4
;
}
@Override
public
boolean
isReady
()
{
return
true
;
}
@Override
public
void
close
()
{
}
}
In this case, the coordinator is very simple. In the
numPartitions
method, tell Storm
how many partitions you have. And also notice that you don't return any metadata. In
an
IPartitionedTransactionalSpout
, the metadata is managed by the emitter directly.
Let's see how the emitter is implemented.
public
static
class
TweetsPartitionedTransactionalEmitter
implements
Emitter
<
TransactionMetadata
>
{
PartitionedRQ
rq
=
new
PartitionedRQ
();
@Override
public
TransactionMetadata
emitPartitionBatchNew
(
TransactionAttempt
tx
,
BatchOutputCollector
collector
,
int
partition
,
TransactionMetadata
lastPartitionMeta
)
{
long
nextRead
;
if
(
lastPartitionMeta
==
null
)
nextRead
=
rq
.
getNextRead
(
partition
);
else
{
nextRead
=
lastPartitionMeta
.
from
+
lastPartitionMeta
.
quantity
;
rq
.
setNextRead
(
partition
,
nextRead
);
// Move the cursor
}
long
quantity
=
rq
.
getAvailableToRead
(
partition
,
nextRead
);
quantity
=
quantity
>
MAX_TRANSACTION_SIZE
?
MAX_TRANSACTION_SIZE
:
quantity
;
TransactionMetadata
metadata
=
new
TransactionMetadata
(
nextRead
,
(
int
)
quantity
);
emitPartitionBatch
(
tx
,
collector
,
partition
,
metadata
);
return
metadata
;
}
@Override
public
void
emitPartitionBatch
(
TransactionAttempt
tx
,
BatchOutputCollector
collector
,
int
partition
,
TransactionMetadata
partitionMeta
)
{
if
(
partitionMeta
.
quantity
<=
0
)
return
;
List
<
String
>
messages
=
rq
.
getMessages
(
partition
,
partitionMeta
.
from
,
partitionMeta
.
quantity
);