Databases Reference
In-Depth Information
Start Looping and Read or Write Tuples
This is the most important step, where all the work gets done. The implementation of
this step depends on if you are developing a spout or a bolt.
In case of a spout, you should start emitting tuples. In case of a bolt, loop and read
tuples, process them and emit, ack or fail.
Let's see the implementation of the spout that emits numbers.
$from = intval ( $argv [ 1 ]);
$to = intval ( $argv [ 2 ]);
while ( true ) {
$msg = read_msg ();
$cmd = json_decode ( $msg , true );
if ( $cmd [ 'command' ] == 'next' ) {
if ( $from < $to ) {
storm_emit ( array ( " $from " ));
$task_ids = read_msg ();
$from ++ ;
} else {
sleep ( 1 );
}
}
storm_sync ();
}
Get the from and to from the command-line arguments and start iterating. Everytime
you get a next message from Storm, it means you are ready to emit a new tuple.
Once you've sent all the numbers and you don't have more tuples to send, just sleep
for some time.
In order to make sure the script is ready for the next tuple, Storm waits for the line sync
\n before sending the next one. To read a command, just call read_msg() and JSON
decode it.
In the case of bolts, this is a little different.
while ( true ) {
$msg = read_msg ();
$tuple = json_decode ( $msg , true , 512 , JSON_BIGINT_AS_STRING );
if ( ! empty ( $tuple [ "id" ])) {
if ( isPrime ( $tuple [ "tuple" ][ 0 ])) {
storm_emit ( array ( $tuple [ "tuple" ][ 0 ]));
}
storm_ack ( $tuple [ "id" ]);
}
}
Loop, reading tuples from standard input. As soon as you get a message, JSON decodes
it. If it is a tuple, process it, checking if it is a prime number.
 
Search WWH ::




Custom Search