Databases Reference
In-Depth Information
The first bolt,
WordNormalizer
, will be responsible for taking each line and
normaliz-
ing
it. It will split the line into words, convert all words to lowercase, and trim them.
First we need to declare the bolt's output parameters:
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
Here we declare that the bolt will emit one Field named word.
Next we implement the
public void execute(Tuple input)
method, where the input
tuples are processed:
public
void
execute
(
Tuple
input
)
{
String
sentence
=
input
.
getString
(
0
);
String
[]
words
=
sentence
.
split
(
" "
);
for
(
String
word
:
words
){
word
=
word
.
trim
();
if
(!
word
.
isEmpty
()){
word
=
word
.
toLowerCase
();
//Emit the word
collector
.
emit
(
new
Values
(
word
));
}
}
// Acknowledge the tuple
collector
.
ack
(
input
);
}
The first line reads the value from the tuple. The value can be read by position or by
name. The value is processed and then emitted using the collector object. After each
tuple is processed, the collector's
ack()
method is called to indicate that processing has
completed successfully. If the tuple could not be processed, the collector's
fail()
method should be called.
Example 2-2
contains the complete code for the class.
Example 2-2. src/main/java/bolts/WordNormalizer.java
package
bolts
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
backtype.storm.task.OutputCollector
;
import
backtype.storm.task.TopologyContext
;
import
backtype.storm.topology.IRichBolt
;
import
backtype.storm.topology.OutputFieldsDeclarer
;
import
backtype.storm.tuple.Fields
;
import
backtype.storm.tuple.Tuple
;
import
backtype.storm.tuple.Values
;
public
class
WordNormalizer
implements
IRichBolt
{
private
OutputCollector
collector
;