Database Reference
In-Depth Information
When defining the topology, these additional streams are added to the
grouping calls after the source name. For example, attaching a bolt to the
cdr
stream using a field grouping looks like this:
builder.setBolt("empty",
new
RichEmptyBolt());
builder.setBolt("process",
new
BasicBolt())
.fieldsGrouping("empty", "cdr",
new
Fields("second"));
Processing incoming tuples is handled by the
execute
method inside the
bolt. In
RichBolt
implementations, it has a single
Tuple
argument. This
class implements the Java
List<Object>
collection interface, but also
provides higher-level access to the fields as declared by the source bolt.
A bolt can do whatever it likes with the input tuple. For example, to produce
theoutputstreamsdeclaredintheearlierexample,assumingtheinputtuple
has elements of the same name, the
execute
method would look like this:
[[OPEN-LW-CODE80]]
public void
execute(Tuple input) {
List<Object> objs = input.select(
new
Fields("first","second","third"));
collector.emit(objs);
collector.emit("car",
new
Values(objs.get(0)));
collector.emit("cdr",
new
Values(objs.get(1),objs.get(2)));
collector.ack(input);
}[[CLOSE-LW-CODE80]]
The first line of this example is using a feature of the
Tuple
wrapper to
extract a subset of the
Tuple
object into a known ordering. This ordering
is the same as the declared output stream, so it can simply be emitted as
is. The
emit
method assumes that the output is an object that implements
List<Object>
, which is converted into a
Tuple
after it arrives at the next
bolt.
Next, the other two streams,
car
and
cdr
, are emitted. Because they only
take a subset of the default stream, new output arrays need to be
constructed. The default Java constructors for
List
objects are generally
verbose, so Storm provides a class called
Values
. This class implements the