Databases Reference
In-Depth Information
}
}
/**
* We will create the file and get the collector object
*/
public
void
open
(
Map
conf
,
TopologyContext
context
,
SpoutOutputCollector
collector
)
{
try
{
this
.
context
=
context
;
this
.
fileReader
=
new
FileReader
(
conf
.
get
(
"wordsFile"
).
toString
());
}
catch
(
FileNotFoundException
e
)
{
throw
new
RuntimeException
(
"Error reading file
["
+
conf
.
get
(
"wordFile"
)+
"]"
);
}
this
.
collector
=
collector
;
}
/**
* Declare the output field "word"
*/
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"line"
));
}
}
The first method called in any spout is
public void open(Map conf, TopologyContext
context, SpoutOutputCollector collector)
. The parameters it receives are the Topo-
logyContext, which contains all our topology data; the conf object, which is created in
the topology definition; and the SpoutOutputCollector, which enables us to emit the
data that will be processed by the bolts. The following code block is the open method
implementation:
public
void
open
(
Map
conf
,
TopologyContext
context
,
SpoutOutputCollector
collector
)
{
try
{
this
.
context
=
context
;
this
.
fileReader
=
new
FileReader
(
conf
.
get
(
"wordsFile"
).
toString
());
}
catch
(
FileNotFoundException
e
)
{
throw
new
RuntimeException
(
"Error reading file ["
+
conf
.
get
(
"wordFile"
)+
"]"
);
}
this
.
collector
=
collector
;
}
In this method we also create the reader, which is responsible for reading the files. Next
we need to implement
public void nextTuple()
, from which we'll emit values to be
processed by the bolts. In our example, the method will read the file and emit a value
per line.
public
void
nextTuple
()
{
if
(
completed
){
try
{
Thread
.
sleep
(
1
);
}
catch
(
InterruptedException
e
)
{