Database Reference
In-Depth Information
)
&& Constants. SYSTEM_TICK_STREAM_ID .equals(
tuple.getSourceStreamId()
);
}
This is used to implement the execute method. If the isTick method
returns true then the bolt should emit all of the values currently stored in
the counts variable. Otherwise, it should increment or insert the value into
the counts map. Finally, it acknowledges the tuple:
public void execute(Tuple input) {
if ( isTick (input)) {
for (Entry<String, Integer> e :
counts.entrySet()) {
collector.emit( new Values(
System. currentTimeMillis (),
e.getKey(),
e.getValue()
));
}
counts.clear();
} else {
String key = input.getString(0);
Integer value = counts.get(key);
counts.put(key, 1 + (value == null ? 0 : value));
}
collector.ack(input);
}
This bolt can now be used in any topology that outputs the item to count as
the first element of its stream. To record the events somewhere like Redis,
attach another bolt to the output of this bolt.
Counting with Trident
Counting events in Trident is simultaneously easier and harder than
counting events using simple topologies. It is simpler in the sense that
it is no longer necessary to implement bolts to perform simple counting
Search WWH ::




Custom Search