Database Reference
In-Depth Information
public void
execute(TridentTuple tuple,
TridentCollector collector) {
if
(tuple.size() == 0)
return
;
try
{
Object obj = parser.parse(tuple.getString(0));
if
(obj
instanceof
JSONObject) {
JSONObject json = (JSONObject)obj;
String raw = (String)json.get("raw");
raw = raw.substring(2,raw.indexOf("]]"));
for
(String word : raw.split("\\s+")) {
collector.emit(
new
Values(word));
}
}
}
catch
(ParseException e) {
collector.reportError(e);
}
}
},
new
Fields("word")).groupBy(
new
Fields("word"))
The output of this function is a tuple for each word. By grouping on
word and then applying a
persistentAggregate
, the count of each
word over time can be obtained. Every time a word appears in the
output, its aggregate will be incremented by the number of times it
appears in the title and then aggregated:
.persistentAggregate(
new
MemoryMapState.Factory(),
new
Count(),
new
Fields("count")
)
.newValuesStream()
Finally, these values are read into the
Debug
function, which works
very much like the
LoggerBolt
implemented earlier in this chapter: