Databases Reference
In-Depth Information
package bolts ;
import java.util.HashMap ;
import java.util.Map ;
import backtype.storm.task.OutputCollector ;
import backtype.storm.task.TopologyContext ;
import backtype.storm.topology.IRichBolt ;
import backtype.storm.topology.OutputFieldsDeclarer ;
import backtype.storm.tuple.Tuple ;
public class WordCounter implements IRichBolt {
Integer id ;
String name ;
Map < String , Integer > counters ;
private OutputCollector collector ;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup () {
System . out . println ( "-- Word Counter [" + name + "-" + id + "] --" );
for ( Map . Entry < String , Integer > entry : counters . entrySet ()){
System . out . println ( entry . getKey ()+ ": " + entry . getValue ());
}
}
/**
* On each word We will count
*/
@Override
public void execute ( Tuple input ) {
String str = input . getString ( 0 );
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if (! counters . containsKey ( str )){
counters . put ( str , 1 );
} else {
Integer c = counters . get ( str ) + 1 ;
counters . put ( str , c );
}
//Set the tuple as Acknowledge
collector . ack ( input );
}
/**
* On create
*/
@Override
public void prepare ( Map stormConf , TopologyContext context ,
OutputCollector collector ) {
 
Search WWH ::




Custom Search