Database Reference
In-Depth Information
HashMap<String,Integer>();
HashSet<String> changed =
new
HashSet<String>();
public void
window(MessageCollector arg0,
TaskCoordinator arg1)
throws
Exception {
for
(String word : changed) {
HashMap<String,Object> val =
new
HashMap<String,Object>();
val.put("word", word);
val.put("count", counts.get(word));
arg0.send(
new
OutgoingMessageEnvelope(
OUTPUT_STREAM
,val));
}
changed.clear();
}
public void
init(Config arg0, TaskContext arg1)
throws
Exception {
counts.clear();
changed.clear();
}
public void
process(IncomingMessageEnvelope arg0,
MessageCollector arg1,
TaskCoordinator arg2)
throws
Exception {
@SuppressWarnings("unchecked")
Map<String,Object> json =
(Map<String,Object>)arg0.getMessage();
String word = (String) json.get("word");
counts.put(word,
(counts.containsKey(word) ? counts.get(word) :
0) + 1);
changed.add(word);
}
}
The properties file that goes along with this Task specifies a 10-second
windowing cycle:
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory