Database Reference
In-Depth Information
@Override
public synchronized void
start() {
super
.start();
pool =
new
JedisPool(
new
JedisPoolConfig(),redisHost,redisPort);
jedis = pool.getResource();
}
@Override
public synchronized void
stop() {
pool.returnResource(jedis);
pool.destroy();
super
.stop();
}
The
process
method retrieves events from the channel and sends them to
Redis. For simplicity, only the body is transmitted in this example, but a
more sophisticated sink might convert the output to another form so that it
could preserve the header metadata:
public
Status process()
throws
EventDeliveryException {
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
Status status = Status.
READY
;
try
{
txn.begin();
Event event = channel.take();
if
(jedis.publish(redisChannel,
new
String(event.getBody(),Charset.
defaultCharset
())) > 0)
{
txn.commit();
}
}
catch
(Exception e) {
txn.rollback();
status = status.
BACKOFF
;
}
finally
{
txn.close();
}