Databases Reference
In-Depth Information
To retrieve messages from Redis, you'll use a thread created at the
open
spout (using a
thread to avoid locking the main loop where the
nextTuple
method is):
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
while
(
true
){
try
{
Jedis
client
=
new
Jedis
(
redisHost
,
redisPort
);
List
<
String
>
res
=
client
.
blpop
(
Integer
.
MAX_VALUE
,
queues
);
messages
.
offer
(
res
.
get
(
1
));
}
catch
(
Exception
e
){
LOG
.
error
(
"Error reading queues from redis"
,
e
);
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e1
)
{}
}
}
}
}).
start
()
The only purpose of this thread is to create the connection and execute the
blpop
com-
mand. When a message is received, it is added to an internal queue of messages that
will be consumed by the
nextTuple
method. Here you can see that the source is the
Redis queue and you don't know which are the message emitters nor their quantity.
We recommend that you not create many threads with spout, because
each spout runs in a different thread. Instead of creating many threads,
it is better to increase the parallelism. This will create more threads in
a distributed fashion through the Storm cluster.
In your
nextTuple
method, the only thing that you'll do is receive the messages and
emit them again.
public
void
nextTuple
()
{
while
(!
messages
.
isEmpty
()){
collector
.
emit
(
new
Values
(
messages
.
poll
()));
}
}
You could transform this spout for the possibility of replaying messages
from Redis to transform this topology into a reliable topology.
DRPC
DRPCSpout is a spout implementation that receives a function invocation stream from
the DRPC server and processes it (see the example in
Chapter 3
). In the most common