Database Reference
In-Depth Information
}
catch
(InterruptedException e) {
cont =
false
;
}
}
}
Next is the
QueueConsumer
implementation. In this case, to maintain
some symmetry with the producer, the
QueueConsumer
implementation
also handles the creation of the
DistributedQueue
implementation. It
starts very much like the producer implementation:
public class
DistributedQueueConsumer
implements
QueueConsumer<WorkUnit> {
DistributedQueue<WorkUnit> queue;
String name;
public
DistributedQueueConsumer(
String connectString,
String name)
throws
Exception {
this
.name = name;
CuratorFramework client =
CuratorFrameworkFactory.
newClient
(
connectString,
new
RetryOneTime(10)
);
client.start();
queue = QueueBuilder.
builder
(
client,
this
,
new
SimpleSerializer<WorkUnit>(),
"/queues/work-unit"
)
.buildQueue();
queue.start();
}
The only difference is that, rather than passing
null
as the
QueueConsumer
, the consumer passes itself. It also implements the
QueueConsumer
interface, which requires the implementation of two