Databases Reference
In-Depth Information
Figure 3-1. DRPC topology schema
The bolt has the following output declarer:
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declare ( new Fields ( "id" , "result" ));
}
Because this is the only bolt in the topology, it must emit the RPC ID and the result.
The execute method is responsible for executing the add operation:
public void execute ( Tuple input ) {
String [] numbers = input . getString ( 1 ). split ( "\\+" );
Integer added = 0 ;
if ( numbers . length < 2 ){
throw new InvalidParameterException ( "Should be at least 2 numbers" );
}
for ( String num : numbers ){
added += Integer . parseInt ( num );
}
collector . emit ( new Values ( input . getValue ( 0 ), added ));
}
Include the added bolt in the topology definition as follows:
public static void main ( String [] args ) {
LocalDRPC drpc = new LocalDRPC ();
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder ( "add" );
builder . addBolt ( new AdderBolt (), 2 );
Config conf = new Config ();
conf . setDebug ( true );
LocalCluster cluster = new LocalCluster ();
cluster . submitTopology ( "drpc-adder-topology" , conf ,
builder . createLocalTopology ( drpc ));
String result = drpc . execute ( "add" , "1+-1" );
checkResult ( result , 0 );
result = drpc . execute ( "add" , "1+1+5+10" );
checkResult ( result , 17 );
cluster . shutdown ();
 
Search WWH ::




Custom Search