Database Reference
In-Depth Information
throws IOException {
socket.setTcpNoDelay(false);
socket.setReceiveBufferSize(20*1024);
socket.setSendBufferSize(20*1024);
}
};
connectionFactory.setHost(amqpHost);
connectionFactory.setPort(amqpPort);
connectionFactory.setUsername(amqpUsername);
connectionFactory.setPassword(amqpPasswd);
connectionFactory.setVirtualHost(amqpVhost);
this.amqpConnection = connectionFactory.newConnection();
this.amqpChannel = amqpConnection.createChannel();
log.info("Setting basic.qos prefetch-count to " +
prefetchCount);
amqpChannel.basicQos(prefetchCount);
amqpChannel.exchangeDeclare(Constants.EXCHANGE_NAME,
"direct");
amqpChannel.queueDeclare(Constants.QUEUE_NAME, true,
false, false, null);
amqpChannel.queueBind(Constants.QUEUE_NAME,
Constants.EXCHANGE_NAME, "");
this.amqpConsumer = new QueueingConsumer(amqpChannel);
assert this.amqpConsumer != null;
this.amqpConsumerTag =
amqpChannel.basicConsume(Constants.QUEUE_NAME,
this.autoAck, amqpConsumer);
}
/*
* Cancels the queue subscription, and disconnects from
the AMQP broker.
*/
public void close() {
try {
if (amqpChannel != null) {
if (amqpConsumerTag != null) {
Search WWH ::




Custom Search