Databases Reference
In-Depth Information
this
.
collector
=
collector
;
}
public
void
execute
(
Tuple
tuple
)
{
String
sentence
=
tuple
.
getString
(
0
);
for
(
String
word:
sentence
.
split
(
" "
))
{
collector
.
emit
(
tuple
,
new
Values
(
word
));
}
collector
.
ack
(
tuple
);
}
public
void
cleanup
()
{
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
}
The exact line where the anchoring happens is at the
collector.emit()
statement. As
mentioned earlier, passing along the tuple enables Storm to keep track of the originating
spouts.
collector.ack(tuple)
and
collector.fail(tuple)
tell a spout what happened
to each message. Storm considers a tuple coming of a spout fully processed when every
message in the tree has been processed. A tuple is considered failed when its tree of
messages fails to be fully processed within a configurable timeout. The default is 30
seconds.
You can change this timeout by changing the
Config.TOPOLOGY_MES
SAGE_TIMEOUT_SECS
configuration on the topology.
Of course, the spout needs to take care of the case when a message fails and retry or
discard the message accordingly.
Every tuple you process must be acked or failed. Storm uses memory to
track each tuple, so if you don't ack/fail every tuple, the task will even-
tually run out of memory.
Multiple Streams
A bolt can emit tuples to multiple streams using
emit(streamId, tuple)
, where
streamId
is a string that identifies the stream. Then, in the
TopologyBuilder
, you can
decide which stream to subscribe to.