Database Reference
In-Depth Information
When the spout's open method is called, the transient
LoremIpsum
class is instantiated because it does not implement
Serializable
and
therefore cannot be configured prior to starting the topology. As usual,
the
collector
element is also captured for later use:
transient
LoremIpsum ipsum;
transient
Random rng;
transient
SpoutOutputCollector collector;
public void
open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this
.collector = collector;
ipsum =
new
LoremIpsum();
rng =
new
Random();
}
Because this Spout is completely self-contained, the transactional
methods
ack
and
fail
, as well as the lifecycle management
methods—
activate
,
deactivate
, and
close
—can be left as stub
implementations.
The
nextTuple
implementation sleeps for a moment, to reduce load
during test cases, and then emits a tuple of the appropriate length
containing “lorem ipsum” text of varying length:
public void
nextTuple() {
Utils.
sleep
(100);
ArrayList<Object> out =
new
ArrayList<Object>();
for
(String s : fields)
out.add(ipsum.getWords(rng.nextInt(maxWords)));
collector.emit(out);
}
To test this
Spout
, use a simple
LocalCluster
implementation to see
it in action. First construct a topology with two
LoremIpsum
spouts:
TopologyBuilder builder =
new
TopologyBuilder();
builder.setSpout("spout1",
new
LoremIpsumSpout()