Databases Reference
In-Depth Information
Next, create a
Config
object containing the topology configuration, which is merged
with the cluster configuration at run time and sent to all nodes with the
prepare
method.
Config
conf
=
new
Config
();
conf
.
put
(
"wordsFile"
,
args
[
0
]);
conf
.
setDebug
(
true
);
Set the property
wordsFile
to the name of the file to be read by the spout, and the
property
debug
to
true
because you're in development. When debug is
true
, Storm
prints all messages exchanged between nodes, and other debug data useful for under-
standing how the topology is running.
As explained earlier, you'll use a
LocalCluster
to run the topology. In a production
environment, the topology runs continuously, but for this example you'll just run the
topology for a few seconds so you can see the results.
LocalCluster
cluster
=
new
LocalCluster
();
cluster
.
submitTopology
(
"Getting-Started-Toplogie"
,
conf
,
builder
.
createTopology
());
Thread
.
sleep
(
2000
);
cluster
.
shutdown
();
Create and run the topology using
createTopology
and
submitTopology
, sleep for two
seconds (the topology runs in a different thread), and then stop the topology by shutting
down the cluster.
See
Example 2-3
to put it all together.
Example 2-3. src/main/java/TopologyMain.java
import
spouts.WordReader
;
import
backtype.storm.Config
;
import
backtype.storm.LocalCluster
;
import
backtype.storm.topology.TopologyBuilder
;
import
backtype.storm.tuple.Fields
;
import
bolts.WordCounter
;
import
bolts.WordNormalizer
;
public
class
TopologyMain
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
//Topology definition
TopologyBuilder
builder
=
new
TopologyBuilder
();
builder
.
setSpout
(
"word-reader"
,
new
WordReader
());
builder
.
setBolt
(
"word-normalizer"
,
new
WordNormalizer
())
.
shuffleGrouping
(
"word-reader"
);
builder
.
setBolt
(
"word-counter"
,
new
WordCounter
(),
2
)
.
fieldsGrouping
(
"word-normalizer"
,
new
Fields
(
"word"
));
//Configuration
Config
conf
=
new
Config
();
conf
.
put
(
"wordsFile"
,
args
[
0
]);
conf
.
setDebug
(
false
);
//Topology run
conf
.
put
(
Config
.
TOPOLOGY_MAX_SPOUT_PENDING
,
1
);