Database Reference
In-Depth Information
Counting in Samza
Samza uses a mechanism very much like the tick events used by Storm
to implement counting jobs. However, the implementation is somewhat
cleaner than the approach used by Storm. In Samza, jobs that perform a
periodic task are called
WindowedTask
s and implement an interface of the
same name. Unlike the Storm version, which requires a check of each tuple,
Samza only calls the windowing event when appropriate. As a result, the
complete code for a counting job is very easy to read:
public class
SimpleCountingJob
implements
WindowableTask, StreamTask, InitableTask {
String field = "field";
SystemStream output;
HashMap<String,Integer> counts =
new
HashMap<String,Integer>();
public void
process(IncomingMessageEnvelope msg,
MessageCollector collector, TaskCoordinator
coordinator)
throws
Exception {
@SuppressWarnings("unchecked")
Map<String,Object> obj =
(Map<String,Object>)msg.getMessage();
String key = obj.get(field).toString();
Integer current = counts.get(key);
counts.put(key, 1 + (current ==
null
? 0 :
current));
}
public void
window(MessageCollector collector,
TaskCoordinator coordinator)
throws
Exception {
collector.send(
new
OutgoingMessageEnvelope(output,counts));
counts =
new
HashMap<String,Integer>();
}
public void
init(Config config, TaskContext context)
throws
Exception
{
field = config.get("count.field", "field");