Databases Reference
In-Depth Information
Also, every transaction that is being processed in parallel will be cancelled when a
previous transaction in cancelled. This is to ensure that you don't miss anything in the
middle.
Your spout should implement
IOpaquePartitionedTransactionalSpout
and as you can
see, the coordinator and emitters are very simple.
public
static
class
TweetsOpaquePartitionedTransactionalSpoutCoordinator
implements
IOpaquePartitionedTransactionalSpout
.
Coordinator
{
@Override
public
boolean
isReady
()
{
return
true
;
}
}
public
static
class
TweetsOpaquePartitionedTransactionalSpoutEmitter
implements
IOpaquePartitionedTransactionalSpout
.
Emitter
<
TransactionMetadata
>
{
PartitionedRQ
rq
=
new
PartitionedRQ
();
@Override
public
TransactionMetadata
emitPartitionBatch
(
TransactionAttempt
tx
,
BatchOutputCollector
collector
,
int
partition
,
TransactionMetadata
lastPartitionMeta
)
{
long
nextRead
;
if
(
lastPartitionMeta
==
null
)
nextRead
=
rq
.
getNextRead
(
partition
);
else
{
nextRead
=
lastPartitionMeta
.
from
+
lastPartitionMeta
.
quantity
;
rq
.
setNextRead
(
partition
,
nextRead
);
// Move the cursor
}
long
quantity
=
rq
.
getAvailableToRead
(
partition
,
nextRead
);
quantity
=
quantity
>
MAX_TRANSACTION_SIZE
?
MAX_TRANSACTION_SIZE
:
quantity
;
TransactionMetadata
metadata
=
new
TransactionMetadata
(
nextRead
,
(
int
)
quantity
);
emitMessages
(
tx
,
collector
,
partition
,
metadata
);
return
metadata
;
}
private
void
emitMessages
(
TransactionAttempt
tx
,
BatchOutputCollector
collector
,
int
partition
,
TransactionMetadata
partitionMeta
)
{
if
(
partitionMeta
.
quantity
<=
0
)
return
;
List
<
String
>
messages
=
rq
.
getMessages
(
partition
,
partitionMeta
.
from
,
partitionMeta
.
quantity
);
long
tweetId
=
partitionMeta
.
from
;
for
(
String
msg
:
messages
)
{
collector
.
emit
(
new
Values
(
tx
,
""
+
tweetId
,
msg
));
tweetId
++;
}
}
@Override