Databases Reference
In-Depth Information
@Override
public
void
emitBatch
(
TransactionAttempt
tx
,
TransactionMetadata
coordinatorMeta
,
BatchOutputCollector
collector
)
{
rq
.
setNextRead
(
coordinatorMeta
.
from
+
coordinatorMeta
.
quantity
);
List
<
String
>
messages
=
rq
.
getMessages
(
coordinatorMeta
.
from
,
coordinatorMeta
.
quantity
);
long
tweetId
=
coordinatorMeta
.
from
;
for
(
String
message
:
messages
)
{
collector
.
emit
(
new
Values
(
tx
,
""
+
tweetId
,
message
));
tweetId
++;
}
}
@Override
public
void
cleanupBefore
(
BigInteger
txid
)
{
}
@Override
public
void
close
()
{
rq
.
close
();
}
}
Emitters are the one who will read the source and send tuples to a stream. It is very
important for the emitters to always be able to send the same batch of tuples for the
same
transaction id
and
transaction metadata
. This way, if something goes wrong during
the processing of a batch, Storm will be able to repeat the same
transaction id
and
transaction metadata
with the emitter and make sure the batch of tuples are repeated.
Storm will increase the
attempt id
in the
TransactionAttempt
. This way you know that
the batch is repeated.
The important method here is
emitBatch
. In this method, use the metadata, given as a
parameter, to get tweets from Redis. Also increase the sequence in Redis that keeps
track of how many tweets you've read so far. And of course, emit the tweets to the
topology.
The Bolts
First let's see the standard bolts of this topology:
public
class
UserSplitterBolt
implements
IBasicBolt
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declareStream
(
"users"
,
new
Fields
(
"txid"
,
"tweet_id"
,
"user"
));
}
@Override
public
Map
<
String
,
Object
>
getComponentConfiguration
()
{
return
null
;