Database Reference
In-Depth Information
} catch (InterruptedException e) {
cont = false ;
}
}
}
Next is the QueueConsumer implementation. In this case, to maintain
some symmetry with the producer, the QueueConsumer implementation
also handles the creation of the DistributedQueue implementation. It
starts very much like the producer implementation:
public class DistributedQueueConsumer
implements QueueConsumer<WorkUnit> {
DistributedQueue<WorkUnit> queue;
String name;
public DistributedQueueConsumer(
String connectString,
String name) throws Exception {
this .name = name;
CuratorFramework client =
CuratorFrameworkFactory. newClient (
connectString,
new RetryOneTime(10)
);
client.start();
queue = QueueBuilder. builder (
client,
this ,
new SimpleSerializer<WorkUnit>(),
"/queues/work-unit"
)
.buildQueue();
queue.start();
}
The only difference is that, rather than passing null as the
QueueConsumer , the consumer passes itself. It also implements the
QueueConsumer interface, which requires the implementation of two
Search WWH ::




Custom Search