Database Reference
In-Depth Information
the process of creating a custom SocketSpout class to consume a socket output in the
Storm topology:
public class SocketSpout extends BaseRichSpout{
static SpoutOutputCollector collector;
//The socket
static Socket myclientSocket;
static ServerSocket myserverSocket;
static int myport;
public SocketSpout(int port){
myport=port;
}
public void open(Map conf,TopologyContext context,
SpoutOutputCollector collector){
_collector=collector;
myserverSocket=new ServerSocket(myport);
}
public void nextTuple(){
myclientSocket=myserverSocket.accept();
InputStream incomingIS=myclientSocket.getInputStream();
byte[] b=new byte[8196];
int len=b.incomingIS.read(b);
_collector.emit(new Values(b));
}
}
Search WWH ::




Custom Search