Databases Reference
In-Depth Information
import
java.util.Map
;
import
backtype.storm.spout.SpoutOutputCollector
;
import
backtype.storm.task.TopologyContext
;
import
backtype.storm.topology.IRichSpout
;
import
backtype.storm.topology.OutputFieldsDeclarer
;
import
backtype.storm.tuple.Fields
;
import
backtype.storm.tuple.Values
;
public
class
WordReader
implements
IRichSpout
{
private
SpoutOutputCollector
collector
;
private
FileReader
fileReader
;
private
boolean
completed
=
false
;
private
TopologyContext
context
;
public
boolean
isDistributed
()
{
return
false
;}
public
void
ack
(
Object
msgId
)
{
System
.
out
.
println
(
"OK:"
+
msgId
);
}
public
void
close
()
{}
public
void
fail
(
Object
msgId
)
{
System
.
out
.
println
(
"FAIL:"
+
msgId
);
}
/**
* The only thing that the methods will do It is emit each
* file line
*/
public
void
nextTuple
()
{
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if
(
completed
){
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
//Do nothing
}
return
;
}
String
str
;
//Open the reader
BufferedReader
reader
=
new
BufferedReader
(
fileReader
);
try
{
//Read all lines
while
((
str
=
reader
.
readLine
())
!=
null
){
/**
* By each line emmit a new value with the line as a their
*/
this
.
collector
.
emit
(
new
Values
(
str
),
str
);
}
}
catch
(
Exception
e
){
throw
new
RuntimeException
(
"Error reading tuple"
,
e
);
}
finally
{
completed
=
true
;