Database Reference
In-Depth Information
)
&& Constants.
SYSTEM_TICK_STREAM_ID
.equals(
tuple.getSourceStreamId()
);
}
This is used to implement the
execute
method. If the
isTick
method
returns
true
then the bolt should emit all of the values currently stored in
the
counts
variable. Otherwise, it should increment or insert the value into
the
counts
map. Finally, it acknowledges the tuple:
public void
execute(Tuple input) {
if
(
isTick
(input)) {
for
(Entry<String, Integer> e :
counts.entrySet()) {
collector.emit(
new
Values(
System.
currentTimeMillis
(),
e.getKey(),
e.getValue()
));
}
counts.clear();
}
else
{
String key = input.getString(0);
Integer value = counts.get(key);
counts.put(key, 1 + (value ==
null
? 0 : value));
}
collector.ack(input);
}
This bolt can now be used in any topology that outputs the item to count as
the first element of its stream. To record the events somewhere like Redis,
attach another bolt to the output of this bolt.
Counting with Trident
Counting events in Trident is simultaneously easier and harder than
counting events using simple topologies. It is simpler in the sense that
it is no longer necessary to implement bolts to perform simple counting