Database Reference
In-Depth Information
The bolt maintains a transient map of elements and their count, which
serves as the local store. Like all bolts, the collector is also captured in a
transient variable. Both are initialized in the preparation method. This bolt
outputs its counts every few seconds as configured in the preceding code, so
an output stream must be declared. Finally, the tick events for this bolt are
configured in getComponentConfiguration :
transient HashMap<String,Integer> counts;
transient OutputCollector collector;
public void prepare(Map stormConf, TopologyContext
context,
OutputCollector collector) {
this .counts = new HashMap<String,Integer>();
this .collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare( new
Fields("timestamp","key","count"));
}
public Map<String, Object> getComponentConfiguration()
{
Config conf = new Config();
conf.put(Config. TOPOLOGY_TICK_TUPLE_FREQ_SECS ,
updates);
return conf;
}
Note that this is not a persistent key-value store, so if the bolt task dies for
some reason, any interim data is lost.
The execute method gets two types of tuples. The first type of tuple is a
tick event that is sent by Storm itself according to the earlier configuration.
To detect this type of event the source component and the stream identifier
of each tuple must be checked:
public static boolean isTick(Tuple tuple) {
return Constants. SYSTEM_COMPONENT_ID .equals(
tuple.getSourceComponent()
Search WWH ::




Custom Search