Databases Reference
In-Depth Information
Take a look at an example of a bolt that will split sentences into words:
class
SplitSentence
implements
IRichBolt
{
private
OutputCollector
collector
;
public
void
prepare
(
Map
conf
,
TopologyContext
context
,
OutputCollector
collector
)
{
this
.
collector
=
collector
;
}
public
void
execute
(
Tuple
tuple
)
{
String
sentence
=
tuple
.
getString
(
0
);
for
(
String
word:
sentence
.
split
(
" "
))
{
collector
.
emit
(
new
Values
(
word
));
}
}
public
void
cleanup
()
{
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
}
As you can see, this bolt is very straightforward. It's worth mentioning that in this
example there is no message guarantee. This means that if the bolt discards a message
for some reason—either because it goes down or because it was deliberately discarded
programmatically—the spout that generated the message will never be notified, and
neither will any of the bolts and spouts in between.
In many cases, you'll want to guarantee message processing through the entire topol-
ogy.
Reliable versus Unreliable Bolts
As was said before, Storm guarantees that each message sent by a spout will be fully
processed by all bolts. This is a design consideration, meaning that you will need to
decide whether your bolts guarantee messages.
A topology is a tree of nodes in which messages (tuples) travel along one or more
branches. Each node will
ack(tuple)
or
fail(tuple)
so that Storm knows when a mes-
sage fails and notifies the spout or spouts that produced the message. Since a Storm
topology runs in a highly parallelized environment, the best way to keep track of the
original spout instance is to include a reference to the originating spout in the message
tuple. This technique is called
Anchoring
. Change the
SplitSentence
bolt that you just
saw, so that it guarantees message processing.
class
SplitSentence
implements
IRichBolt
{
private
OutputCollector
collector
;
public
void
prepare
(
Map
conf
,
TopologyContext
context
,
OutputCollector
collector
)
{