Database Reference
In-Depth Information
bind.fieldNdx =
fromId.fieldIndex(def.fieldName);
bounds.add(bind);
}
}
if
(bounds.size() > 0) bindings.put(id,
bounds);
}
}
Ideally, the analysis done during the prepare phase to determine the
input fields could be used to programmatically determine the output
fields. Unfortunately, the
declareOutputFields
method is called
during topology construction and not during topology initialization. As
a result, the tuple's elements to include from each filter are specified
when the filter is defined and simply returned by
declareOutputFields
:
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
for
(FilterDefinition def : filters)
declarer.declareStream(def.stream, def.fields);
}
The implementation of the filter itself makes use of the filter bindings
created in the
prepare
method. Each
Tuple
passed to a
Bolt
travels
with metadata about the origin of the
Tuple
, including the
GlobalStreamId
. This is obtained using
getSourceGlobalStreamId
and is used to look up the bound filters
for this particular
Tuple
. If there are application filters, the tuple is
then applied to each filter in the list and emitted to the appropriate
stream if it matches, using the
select
method on the tuple to extract a
tuple that matches the configured
Fields
element:
public void
execute(Tuple input) {
List<FilterBinding> bound