Database Reference
In-Depth Information
try
{
@SuppressWarnings("unchecked")
Map<String,Object> json =
(Map<String,Object>)envelope.getMessage();
String raw = (String)json.get("raw");
raw = raw.substring(2,raw.indexOf("]]"));
for
(String word : raw.split("\\s+")) {
HashMap<String,Object> val =
new
HashMap<String,Object>();
val.put("word", word);
collector.send(
new
OutgoingMessageEnvelope(
OUTPUT_STREAM
,val));
}
}
catch
(Exception e) {
System.
err
.println(e);
}
}
}
Initializing Tasks, Windows
In addition to the
StreamTask
interface, a task may implement the
InitableTask
interface. This adds an
init
method that is called when
the task object is created. This is used to establish connections to resources
at run time that cannot be statically initialized.
Tasks that should periodically emit values, such as counting operations, can
implement the
WindowableTask
interface. This interface adds a
window
method that is periodically called by the Samza framework.
For example, a task that consumes the output of
wikipedia-words
from
the previous section and maintains counts of the individual words might use
all three interfaces. In this case, the
init
interface is not strictly necessary,
but it is included for completeness:
public class
WordCountTask
implements
StreamTask,
InitableTask, WindowableTask {
private static final
SystemStream
OUTPUT_STREAM
=
new
SystemStream("kafka","wikipedia-counts");
HashMap<String,Integer> counts =
new