Databases Reference
In-Depth Information
this . counters = new HashMap < String , Integer >();
this . collector = collector ;
this . name = context . getThisComponentId ();
this . id = context . getThisTaskId ();
}
@Override
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {}
}
The execute method uses a map to collect and count the words. When the topology
terminates, the cleanup() method is called and prints out the counter map. (This is just
an example, but normally you should use the cleanup() method to close active con-
nections and other resources when the topology shuts down.)
The Main Class
In the main class, you'll create the topology and a LocalCluster object, which enables
you to test and debug the topology locally. In conjunction with the Config object,
LocalCluster allows you to try out different cluster configurations. For example, if a
global or class variable was accidentally used, you would find the error when testing
your topology in configurations with a different number of workers. (You'll see more
on config objects in Chapter 3 .)
All topology nodes should be able to run independently with no shared
data between processes (i.e., no global or class variables) because when
the topology runs in a real cluster, these processes may run on different
machines.
You'll create the topology using a TopologyBuilder , which tells Storm how the nodes
are arranged and how they exchange data.
TopologyBuilder builder = new TopologyBuilder ();
builder . setSpout ( "word-reader" , new WordReader ());
builder . setBolt ( "word-normalizer" , new WordNormalizer ()). shuffleGrouping ( "word-
reader" );
builder . setBolt ( "word-counter" , new WordCounter ()). shuffleGrouping ( "word-
normalizer" );
The spout and the bolts are connected using shuffleGrouping s. This type of grouping
tells Storm to send messages from the source node to target nodes in randomly dis-
tributed fashion.
 
Search WWH ::




Custom Search