Database Reference
In-Depth Information
Counting in Samza
Samza uses a mechanism very much like the tick events used by Storm
to implement counting jobs. However, the implementation is somewhat
cleaner than the approach used by Storm. In Samza, jobs that perform a
periodic task are called WindowedTask s and implement an interface of the
same name. Unlike the Storm version, which requires a check of each tuple,
Samza only calls the windowing event when appropriate. As a result, the
complete code for a counting job is very easy to read:
public class SimpleCountingJob
implements WindowableTask, StreamTask, InitableTask {
String field = "field";
SystemStream output;
HashMap<String,Integer> counts = new
HashMap<String,Integer>();
public void process(IncomingMessageEnvelope msg,
MessageCollector collector, TaskCoordinator
coordinator)
throws Exception {
@SuppressWarnings("unchecked")
Map<String,Object> obj =
(Map<String,Object>)msg.getMessage();
String key = obj.get(field).toString();
Integer current = counts.get(key);
counts.put(key, 1 + (current == null ? 0 :
current));
}
public void window(MessageCollector collector,
TaskCoordinator coordinator)
throws Exception {
collector.send( new
OutgoingMessageEnvelope(output,counts));
counts = new HashMap<String,Integer>();
}
public void init(Config config, TaskContext context)
throws Exception
{
field = config.get("count.field", "field");
Search WWH ::




Custom Search