Databases Reference
In-Depth Information
Figure 3-1. DRPC topology schema
The bolt has the following output declarer:
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"id"
,
"result"
));
}
Because this is the only bolt in the topology, it must emit the RPC ID and the result.
The
execute
method is responsible for executing the add operation:
public
void
execute
(
Tuple
input
)
{
String
[]
numbers
=
input
.
getString
(
1
).
split
(
"\\+"
);
Integer
added
=
0
;
if
(
numbers
.
length
<
2
){
throw
new
InvalidParameterException
(
"Should be at least 2 numbers"
);
}
for
(
String
num
:
numbers
){
added
+=
Integer
.
parseInt
(
num
);
}
collector
.
emit
(
new
Values
(
input
.
getValue
(
0
),
added
));
}
Include the added bolt in the topology definition as follows:
public
static
void
main
(
String
[]
args
)
{
LocalDRPC
drpc
=
new
LocalDRPC
();
LinearDRPCTopologyBuilder
builder
=
new
LinearDRPCTopologyBuilder
(
"add"
);
builder
.
addBolt
(
new
AdderBolt
(),
2
);
Config
conf
=
new
Config
();
conf
.
setDebug
(
true
);
LocalCluster
cluster
=
new
LocalCluster
();
cluster
.
submitTopology
(
"drpc-adder-topology"
,
conf
,
builder
.
createLocalTopology
(
drpc
));
String
result
=
drpc
.
execute
(
"add"
,
"1+-1"
);
checkResult
(
result
,
0
);
result
=
drpc
.
execute
(
"add"
,
"1+1+5+10"
);
checkResult
(
result
,
17
);
cluster
.
shutdown
();