Databases Reference
In-Depth Information
Here you are locking the nextTuple method, so you never execute the
ack and fail methods. In a real application, we recommend that you do
the locking into a separate thread and use an internal queue to exchange
information (you'll learn how to do that in the next example, “Enqueued
Messages” on page 34 ).
This is great!
You're reading the Twitter stream with a single spout. If you parallelize the topology,
you'll have several spouts reading different partitions of the same stream, which doesn't
make sense. So how do you parallelize processing if you have several streams to read?
One interesting feature of Storm is that you can access the TopologyContext from any
component (spouts/bolts). Using this feature, you can divide the streams between your
spout instances.
public void open ( Map conf , TopologyContext context ,
SpoutOutputCollector collector ) {
//Get the spout size from the context
int spoutsSize =
context . getComponentTasks ( context . getThisComponentId ()). size ();
//Get the id of this spout
int myIdx = context . getThisTaskIndex ();
String [] tracks = (( String ) conf . get ( "track" )). split ( "," );
StringBuffer tracksBuffer = new StringBuffer ();
for ( int i = 0 ; i < tracks . length ; i ++){
//Check if this spout must read the track word
if ( i % spoutsSize == myIdx ){
tracksBuffer . append ( "," );
tracksBuffer . append ( tracks [ i ]);
}
}
if ( tracksBuffer . length () == 0 ) {
throw new RuntimeException ( "No track found for spout" +
" [spoutsSize:" + spoutsSize + ", tracks:" + tracks . length + "] the amount" +
" of tracks must be more then the spout paralellism" );
this . track = tracksBuffer . substring ( 1 ). toString ();
}
...
}
Using this technique, you can distribute collectors evenly across data sources. The same
technique can be applied in other situations—for example, for collecting log files from
web servers. See Figure 4-2 .
 
Search WWH ::




Custom Search