Database Reference
In-Depth Information
HashMap<String,Integer>();
HashSet<String> changed = new HashSet<String>();
public void window(MessageCollector arg0,
TaskCoordinator arg1)
throws Exception {
for (String word : changed) {
HashMap<String,Object> val = new
HashMap<String,Object>();
val.put("word", word);
val.put("count", counts.get(word));
arg0.send( new
OutgoingMessageEnvelope( OUTPUT_STREAM ,val));
}
changed.clear();
}
public void init(Config arg0, TaskContext arg1)
throws Exception {
counts.clear();
changed.clear();
}
public void process(IncomingMessageEnvelope arg0,
MessageCollector arg1,
TaskCoordinator arg2) throws Exception {
@SuppressWarnings("unchecked")
Map<String,Object> json =
(Map<String,Object>)arg0.getMessage();
String word = (String) json.get("word");
counts.put(word,
(counts.containsKey(word) ? counts.get(word) :
0) + 1);
changed.add(word);
}
}
The properties file that goes along with this Task specifies a 10-second
windowing cycle:
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
Search WWH ::




Custom Search