Databases Reference
In-Depth Information
...
}
We've added an if to check the stream source. Storm gives us the possibility to declare
named streams (if you don't send a tuple to a named stream, the stream is "default" );
it's an excellent way to identify the source of the tuples, such as this case where we
want to identify the signals .
In the topology definition, you'll add a second stream to the word-counter bolt that
sends each tuple from the signals-spout stream to all instances of the bolt.
builder . setBolt ( "word-counter" , new WordCounter (), 2 )
. fieldsGrouping ( "word-normalizer" , new Fields ( "word" ))
. allGrouping ( "signals-spout" , "signals" );
The implementation of signals-spout can be found at the git repository .
Custom Grouping
You can create your own custom stream grouping by implementing the back
type.storm.grouping.CustomStreamGrouping interface. This gives you the power to de-
cide which bolt(s) will receive each tuple.
Let's modify the word count example, to group tuples so that all words that start with
the same letter will be received by the same bolt.
public class ModuleGrouping implements CustomStreamGrouping , Serializable {
int numTasks = 0 ;
@Override
public List < Integer > chooseTasks ( List < Object > values ) {
List < Integer > boltIds = new ArrayList ();
if ( values . size ()> 0 ){
String str = values . get ( 0 ). toString ();
if ( str . isEmpty ())
boltIds . add ( 0 );
else
boltIds . add ( str . charAt ( 0 ) % numTasks );
}
return boltIds ;
}
@Override
public void prepare ( TopologyContext context , Fields outFields ,
List < Integer > targetTasks ) {
numTasks = targetTasks . size ();
}
}
You can see a simple implementation of CustomStreamGrouping , where we use the
amount of tasks to take the modulus of the integer value of the first character of the
word, thus selecting which bolt will receive the tuple.
 
Search WWH ::




Custom Search