Databases Reference
In-Depth Information
To retrieve messages from Redis, you'll use a thread created at the open spout (using a
thread to avoid locking the main loop where the nextTuple method is):
new Thread ( new Runnable () {
@Override
public void run () {
while ( true ){
try {
Jedis client = new Jedis ( redisHost , redisPort );
List < String > res = client . blpop ( Integer . MAX_VALUE , queues );
messages . offer ( res . get ( 1 ));
} catch ( Exception e ){
LOG . error ( "Error reading queues from redis" , e );
try {
Thread . sleep ( 100 );
} catch ( InterruptedException e1 ) {}
}
}
}
}). start ()
The only purpose of this thread is to create the connection and execute the blpop com-
mand. When a message is received, it is added to an internal queue of messages that
will be consumed by the nextTuple method. Here you can see that the source is the
Redis queue and you don't know which are the message emitters nor their quantity.
We recommend that you not create many threads with spout, because
each spout runs in a different thread. Instead of creating many threads,
it is better to increase the parallelism. This will create more threads in
a distributed fashion through the Storm cluster.
In your nextTuple method, the only thing that you'll do is receive the messages and
emit them again.
public void nextTuple () {
while (! messages . isEmpty ()){
collector . emit ( new Values ( messages . poll ()));
}
}
You could transform this spout for the possibility of replaying messages
from Redis to transform this topology into a reliable topology.
DRPC
DRPCSpout is a spout implementation that receives a function invocation stream from
the DRPC server and processes it (see the example in Chapter 3 ). In the most common
 
Search WWH ::




Custom Search