Database Reference
In-Depth Information
methods. In this case, the “processing” of a queue element simply emits the
information found in the WorkUnit object:
public void stateChanged(CuratorFramework arg0,
ConnectionState arg1) {
System. out .println(arg1);
}
public void consumeMessage(WorkUnit message) throws
Exception {
System. out .println(name
+" consumed "+message.workId+"/"+message.payload);
}
Finally, here is a simple process that can be created that starts two queue
consumerstohandlequeueitemsandaqueueproducertocreatenewitems:
DistributedQueueConsumer q1 = new
DistributedQueueConsumer(
"localhost","queue 1"
);
DistributedQueueConsumer q2 = new
DistributedQueueConsumer(
"localhost","queue 2"
);
new Thread( new
DistributedQueueProducer("localhost")).run();
When this process starts running, it first consumes any items left over in
thequeuefrompreviousexecutions.Itthenstartsproducingandconsuming
items. Note that the items are assigned to arbitrary queue consumers
depending on the watch that happens to get triggered by the ZooKeeper
server:
Starting Producer
Added item
queue 2 consumed 0/Next Work Unit 1
Added item
queue 2 consumed 1/Next Work Unit 2
Added item
Search WWH ::




Custom Search