Database Reference
In-Depth Information
public void execute(TridentTuple tuple,
TridentCollector collector) {
if (tuple.size() == 0) return ;
try {
Object obj = parser.parse(tuple.getString(0));
if (obj instanceof JSONObject) {
JSONObject json = (JSONObject)obj;
String raw = (String)json.get("raw");
raw = raw.substring(2,raw.indexOf("]]"));
for (String word : raw.split("\\s+")) {
collector.emit( new Values(word));
}
}
} catch (ParseException e) {
collector.reportError(e);
}
}
}, new Fields("word")).groupBy( new Fields("word"))
The output of this function is a tuple for each word. By grouping on
word and then applying a persistentAggregate , the count of each
word over time can be obtained. Every time a word appears in the
output, its aggregate will be incremented by the number of times it
appears in the title and then aggregated:
.persistentAggregate(
new MemoryMapState.Factory(),
new Count(),
new Fields("count")
)
.newValuesStream()
Finally, these values are read into the Debug function, which works
very much like the LoggerBolt implemented earlier in this chapter:
Search WWH ::




Custom Search