Database Reference
In-Depth Information
new
Thread(
new
Runnable() {
public void
run() {
jedis.subscribe(pubSub, redisChannel);
}
}).start();
}
The Jedis client for Redis blocks its thread when the subscribe command is
called, and all subsequent communication is sent to a
JedisPubSub
object.
To avoid blocking the main Flume thread, the blocking operation is done
in a separate thread in the
start
method. The
JedisPubSub
object has
several different methods defined, but only one is necessary here:
ChannelProcessor processor;
JedisPubSub pubSub =
new
JedisPubSub() {
@Override
public void
onMessage(String arg0, String arg1) {
processor.processEvent(
EventBuilder.
withBody
(arg1,
Charset.
defaultCharset
()));
}
This method uses the
EventBuilder
class to construct an event that is
then forwarded to the
ChannelProcessor
to be placed on a channel. The
stop
method unsubscribes from the channel, which unblocks the Jedis
thread and lets the process complete:
@Override
public synchronized void
stop() {
pubSub.unsubscribe();
super
.stop();
}
Usingthissourceisverysimilartousinganyothersource.Thetypeproperty
is set to the source's fully qualified class name, and any configuration
parameters are set:
agent_1.source.source-1.type=wiley.streaming.flume.RedisSource
agent_1.source.source-1.redis-channel=events