Databases Reference
In-Depth Information
The first bolt, WordNormalizer , will be responsible for taking each line and normaliz-
ing it. It will split the line into words, convert all words to lowercase, and trim them.
First we need to declare the bolt's output parameters:
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declare ( new Fields ( "word" ));
}
Here we declare that the bolt will emit one Field named word.
Next we implement the public void execute(Tuple input) method, where the input
tuples are processed:
public void execute ( Tuple input ) {
String sentence = input . getString ( 0 );
String [] words = sentence . split ( " " );
for ( String word : words ){
word = word . trim ();
if (! word . isEmpty ()){
word = word . toLowerCase ();
//Emit the word
collector . emit ( new Values ( word ));
}
}
// Acknowledge the tuple
collector . ack ( input );
}
The first line reads the value from the tuple. The value can be read by position or by
name. The value is processed and then emitted using the collector object. After each
tuple is processed, the collector's ack() method is called to indicate that processing has
completed successfully. If the tuple could not be processed, the collector's fail()
method should be called.
Example 2-2 contains the complete code for the class.
Example 2-2. src/main/java/bolts/WordNormalizer.java
package bolts ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import backtype.storm.task.OutputCollector ;
import backtype.storm.task.TopologyContext ;
import backtype.storm.topology.IRichBolt ;
import backtype.storm.topology.OutputFieldsDeclarer ;
import backtype.storm.tuple.Fields ;
import backtype.storm.tuple.Tuple ;
import backtype.storm.tuple.Values ;
public class WordNormalizer implements IRichBolt {
private OutputCollector collector ;
 
Search WWH ::




Custom Search