Databases Reference
In-Depth Information
this . collector = collector ;
}
public void execute ( Tuple tuple ) {
String sentence = tuple . getString ( 0 );
for ( String word: sentence . split ( " " )) {
collector . emit ( tuple , new Values ( word ));
}
collector . ack ( tuple );
}
public void cleanup () {
}
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declare ( new Fields ( "word" ));
}
}
The exact line where the anchoring happens is at the collector.emit() statement. As
mentioned earlier, passing along the tuple enables Storm to keep track of the originating
spouts. collector.ack(tuple) and collector.fail(tuple) tell a spout what happened
to each message. Storm considers a tuple coming of a spout fully processed when every
message in the tree has been processed. A tuple is considered failed when its tree of
messages fails to be fully processed within a configurable timeout. The default is 30
seconds.
You can change this timeout by changing the Config.TOPOLOGY_MES
SAGE_TIMEOUT_SECS configuration on the topology.
Of course, the spout needs to take care of the case when a message fails and retry or
discard the message accordingly.
Every tuple you process must be acked or failed. Storm uses memory to
track each tuple, so if you don't ack/fail every tuple, the task will even-
tually run out of memory.
Multiple Streams
A bolt can emit tuples to multiple streams using emit(streamId, tuple) , where
streamId is a string that identifies the stream. Then, in the TopologyBuilder , you can
decide which stream to subscribe to.
 
Search WWH ::




Custom Search