Database Reference
In-Depth Information
Integrating Storm with RabbitMQ
Now that we have installed Storm, the next step will be to integrate RabbitMQ with Storm,
for which we will have to create a custom spout called the RabbitMQ spout. This spout will
read the messages from the specified queue; thus, it will furnish the role of a consumer, and
then push these messages to a downstream topology.
Here is how the spout code will look:
public class AMQPRecvSpout implements IRichSpout{
//The constructor where we set initialize all properties
public AMQPRecvSpout(String host, int port, String
username, String password, String vhost, boolean
requeueOnFail, boolean autoAck) {
this.amqpHost = host;
this.amqpPort = port;
this.amqpUsername = username;
this.amqpPasswd = password;
this.amqpVhost = vhost;
this.requeueOnFail = requeueOnFail;
this.autoAck = autoAck;
}
/*
Open method of the spout , here we initialize the prefetch
count , this parameter specified how many messages would be
prefetched from the queue by the spout - to increase the
efficiency of the solution */
public void open(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, SpoutOutputCollector collector) {
Long prefetchCount = (Long)
conf.get(CONFIG_PREFETCH_COUNT);
if (prefetchCount == null) {
log.info("Using default prefetch-count");
prefetchCount = DEFAULT_PREFETCH_COUNT;
} else if (prefetchCount < 1) {
throw new
IllegalArgumentException(CONFIG_PREFETCH_COUNT + " must be
Search WWH ::




Custom Search