Database Reference
In-Depth Information
amqpChannel.basicCancel(amqpConsumerTag);
}
amqpChannel.close();
}
} catch (IOException e) {
log.warn("Error closing AMQP channel", e);
}
try {
if (amqpConnection != null) {
amqpConnection.close();
}
} catch (IOException e) {
log.warn("Error closing AMQP connection", e);
}
}
/*
* Emit message received from queue into collector
*/
public void nextTuple() {
if (spoutActive && amqpConsumer != null) {
try {
final QueueingConsumer.Delivery delivery =
amqpConsumer.nextDelivery(WAIT_FOR_NEXT_MESSAGE);
if (delivery == null) return;
final long deliveryTag =
delivery.getEnvelope().getDeliveryTag();
String message = new String(delivery.getBody());
if (message != null && message.length() > 0) {
collector.emit(new Values(message), deliveryTag);
} else {
log.debug("Malformed deserialized message, null
or zero- length. " + deliveryTag);
if (!this.autoAck) {
ack(deliveryTag);
}
}
} catch (ShutdownSignalException e) {
Search WWH ::




Custom Search