Databases Reference
In-Depth Information
Next, create a Config object containing the topology configuration, which is merged
with the cluster configuration at run time and sent to all nodes with the prepare method.
Config conf = new Config ();
conf . put ( "wordsFile" , args [ 0 ]);
conf . setDebug ( true );
Set the property wordsFile to the name of the file to be read by the spout, and the
property debug to true because you're in development. When debug is true , Storm
prints all messages exchanged between nodes, and other debug data useful for under-
standing how the topology is running.
As explained earlier, you'll use a LocalCluster to run the topology. In a production
environment, the topology runs continuously, but for this example you'll just run the
topology for a few seconds so you can see the results.
LocalCluster cluster = new LocalCluster ();
cluster . submitTopology ( "Getting-Started-Toplogie" , conf , builder . createTopology ());
Thread . sleep ( 2000 );
cluster . shutdown ();
Create and run the topology using createTopology and submitTopology , sleep for two
seconds (the topology runs in a different thread), and then stop the topology by shutting
down the cluster.
See Example 2-3 to put it all together.
Example 2-3. src/main/java/TopologyMain.java
import spouts.WordReader ;
import backtype.storm.Config ;
import backtype.storm.LocalCluster ;
import backtype.storm.topology.TopologyBuilder ;
import backtype.storm.tuple.Fields ;
import bolts.WordCounter ;
import bolts.WordNormalizer ;
public class TopologyMain {
public static void main ( String [] args ) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder ();
builder . setSpout ( "word-reader" , new WordReader ());
builder . setBolt ( "word-normalizer" , new WordNormalizer ())
. shuffleGrouping ( "word-reader" );
builder . setBolt ( "word-counter" , new WordCounter (), 2 )
. fieldsGrouping ( "word-normalizer" , new Fields ( "word" ));
//Configuration
Config conf = new Config ();
conf . put ( "wordsFile" , args [ 0 ]);
conf . setDebug ( false );
//Topology run
conf . put ( Config . TOPOLOGY_MAX_SPOUT_PENDING , 1 );
 
Search WWH ::




Custom Search