Database Reference
In-Depth Information
Building a topology with a cache
Once we have the basic cache framework in place, it's very easy to plug it into the bolts and
reference data from the cache or update it in the cache. Here is the snippet for the cache:
public class MyCacheReaderBolt extends BaseBasicBolt {
MyCacheReadercacheReader;
@Override
public void prepare(Map stormConf, TopologyContext
context) {
super.prepare(stormConf, context);
try {
cacheReader = new MyCacheReader();
} catch (Exception e) {
logger.error("Error while initializing Cache", e);
}
}
/**
* Called whenever a new tuple is received by this
bolt. Responsible for
* emitting cache enriched event onto output stream
*/
public void execute(Tuple tuple, BasicOutputCollector
collector) {
logger.info("execute method :: Start ");
event = tuple.getString(0);
populateEventFromCache(event);
collector.emit(outputStream, new Values(event));
} else {
logger.warn("Event not parsed :: " + tuple.getString(0));
}
} catch (Exception e) {
logger.error("Error in execute() ", e);
}
}
logger.info("execute method :: End ");
Search WWH ::




Custom Search