Database Reference
In-Depth Information
methods. In this case, the “processing” of a queue element simply emits the
information found in the
WorkUnit
object:
public void
stateChanged(CuratorFramework arg0,
ConnectionState arg1) {
System.
out
.println(arg1);
}
public void
consumeMessage(WorkUnit message)
throws
Exception {
System.
out
.println(name
+" consumed "+message.workId+"/"+message.payload);
}
Finally, here is a simple process that can be created that starts two queue
consumerstohandlequeueitemsandaqueueproducertocreatenewitems:
DistributedQueueConsumer q1 =
new
DistributedQueueConsumer(
"localhost","queue 1"
);
DistributedQueueConsumer q2 =
new
DistributedQueueConsumer(
"localhost","queue 2"
);
new
Thread(
new
DistributedQueueProducer("localhost")).run();
When this process starts running, it first consumes any items left over in
thequeuefrompreviousexecutions.Itthenstartsproducingandconsuming
items. Note that the items are assigned to arbitrary queue consumers
depending on the watch that happens to get triggered by the ZooKeeper
server:
Starting Producer
Added item
queue 2 consumed 0/Next Work Unit 1
Added item
queue 2 consumed 1/Next Work Unit 2
Added item