Databases Reference
In-Depth Information
long
from
;
int
quantity
;
public
TransactionMetadata
(
long
from
,
int
quantity
)
{
this
.
from
=
from
;
this
.
quantity
=
quantity
;
}
}
Here you'll store
from
and
quantity
, which will tell you exactly how to generate the
batch of tuples.
To finish the implementation of the spout, you need to implement the following three
methods:
@Override
public
ITransactionalSpout
.
Coordinator
<
TransactionMetadata
>
getCoordinator
(
Map
conf
,
TopologyContext
context
)
{
return
new
TweetsTransactionalSpoutCoordinator
();
}
@Override
public
backtype
.
storm
.
transactional
.
ITransactionalSpout
.
Emitter
<
TransactionMetadata
>
getEmitter
(
Map
conf
,
TopologyContext
context
)
{
return
new
TweetsTransactionalSpoutEmitter
();
}
@Override
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"txid"
,
"tweet_id"
,
"tweet"
));
}
In the
getCoordinator
method, you tell Storm which class will coordinate the generation
of batches of tuples. With
getEmitter
, you tell Storm which class will be responsible
for reading batches of tuples from the source and emitting them to a stream in the
topology. And finally, as you did before, you need to declare which fields are emitted.
The RQ class
To make the example easier, we've decided to encapsulate all operations with Redis in
one single class.
public
class
RQ
{
public
static
final
String
NEXT_READ
=
"NEXT_READ"
;
public
static
final
String
NEXT_WRITE
=
"NEXT_WRITE"
;
Jedis
jedis
;
public
RQ
()
{
jedis
=
new
Jedis
(
"localhost"
);
}
public
long
getAvailableToRead
(
long
current
)
{
return
getNextWrite
()
-
current
;