Databases Reference
In-Depth Information
To use this grouping in the example, change the word-normalizer grouping by the fol-
lowing:
builder . setBolt ( "word-normalizer" , new WordNormalizer ())
. customGrouping ( "word-reader" , new ModuleGrouping ());
Direct Grouping
This is a special grouping where the source decides which component will receive the
tuple. Similarly to the previous example, the source will decide which bolt receives the
tuple based on the first letter of the word. To use direct grouping, in the WordNormal
izer bolt, use the emitDirect method instead of emit .
public void execute ( Tuple input ) {
...
for ( String word : words ){
if (! word . isEmpty ()){
...
collector . emitDirect ( getWordCountIndex ( word ), new Values ( word ));
}
}
// Acknowledge the tuple
collector . ack ( input );
}
public Integer getWordCountIndex ( String word ) {
word = word . trim (). toUpperCase ();
if ( word . isEmpty ())
return 0 ;
else
return word . charAt ( 0 ) % numCounterTasks ;
}
Work out the number of target tasks in the prepare method:
public void prepare ( Map stormConf , TopologyContext context ,
OutputCollector collector ) {
this . collector = collector ;
this . numCounterTasks = context . getComponentTasks ( "word-counter" );
}
And in the topology definition, specify that the stream will be grouped directly:
builder . setBolt ( "word-counter" , new WordCounter (), 2 )
. directGrouping ( "word-normalizer" );
Global Grouping
Global Grouping sends tuples generated by all instances of the source to a single target
instance (specifically, the task with lowest ID).
 
Search WWH ::




Custom Search