Databases Reference
In-Depth Information
}
public
long
getNextRead
()
{
String
sNextRead
=
jedis
.
get
(
NEXT_READ
);
if
(
sNextRead
==
null
)
return
1
;
return
Long
.
valueOf
(
sNextRead
);
}
public
long
getNextWrite
()
{
return
Long
.
valueOf
(
jedis
.
get
(
NEXT_WRITE
));
}
public
void
close
()
{
jedis
.
disconnect
();
}
public
void
setNextRead
(
long
nextRead
)
{
jedis
.
set
(
NEXT_READ
,
""
+
nextRead
);
}
public
List
<
String
>
getMessages
(
long
from
,
int
quantity
)
{
String
[]
keys
=
new
String
[
quantity
];
for
(
int
i
=
0
;
i
<
quantity
;
i
++)
keys
[
i
]
=
""
+(
i
+
from
);
return
jedis
.
mget
(
keys
);
}
}
Read carefully the implementation of each method, and make sure you understand
what they do.
The Coordinator
Let's see the implementation of the coordinator of this example.
public
static
class
TweetsTransactionalSpoutCoordinator
implements
ITransactionalSpout
.
Coordinator
<
TransactionMetadata
>
{
TransactionMetadata
lastTransactionMetadata
;
RQ
rq
=
new
RQ
();
long
nextRead
=
0
;
public
TweetsTransactionalSpoutCoordinator
()
{
nextRead
=
rq
.
getNextRead
();
}
@Override
public
TransactionMetadata
initializeTransaction
(
BigInteger
txid
,
TransactionMetadata
prevMetadata
)
{
long
quantity
=
rq
.
getAvailableToRead
(
nextRead
);
quantity
=
quantity
>
MAX_TRANSACTION_SIZE
?
MAX_TRANSACTION_SIZE
:
quantity
;
TransactionMetadata
ret
=
new
TransactionMetadata
(
nextRead
,
(
int
)
quantity
);