Databases Reference
In-Depth Information
import java.util.Map ;
import backtype.storm.spout.SpoutOutputCollector ;
import backtype.storm.task.TopologyContext ;
import backtype.storm.topology.IRichSpout ;
import backtype.storm.topology.OutputFieldsDeclarer ;
import backtype.storm.tuple.Fields ;
import backtype.storm.tuple.Values ;
public class WordReader implements IRichSpout {
private SpoutOutputCollector collector ;
private FileReader fileReader ;
private boolean completed = false ;
private TopologyContext context ;
public boolean isDistributed () { return false ;}
public void ack ( Object msgId ) {
System . out . println ( "OK:" + msgId );
}
public void close () {}
public void fail ( Object msgId ) {
System . out . println ( "FAIL:" + msgId );
}
/**
* The only thing that the methods will do It is emit each
* file line
*/
public void nextTuple () {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if ( completed ){
try {
Thread . sleep ( 1000 );
} catch ( InterruptedException e ) {
//Do nothing
}
return ;
}
String str ;
//Open the reader
BufferedReader reader = new BufferedReader ( fileReader );
try {
//Read all lines
while (( str = reader . readLine ()) != null ){
/**
* By each line emmit a new value with the line as a their
*/
this . collector . emit ( new Values ( str ), str );
}
} catch ( Exception e ){
throw new RuntimeException ( "Error reading tuple" , e );
} finally {
completed = true ;
 
Search WWH ::




Custom Search