Databases Reference
In-Depth Information
Starting from the source code directory that you cloned in Git, connect into the part3
subdirectory. We'll define a new class called ScrubFunction as our custom operation,
which subclasses from BaseOperation while implementing the Function interface:
public class ScrubFunction extends BaseOperation implements Function { ... }
Next, we need to define a constructor, which specifies how this function consumes from
the tuple stream:
public ScrubFunction ( Fields fieldDeclaration )
{
super ( 2 , fieldDeclaration );
}
The fieldDeclaration parameter declares a list of fields that will be consumed from
the tuple stream. Based on the intended use, we know that the tuple stream will have
two fields at that point, doc_id and token . We can constrain this class to allow exactly
two fields as the number of arguments. Great, now we know what the new operation
expects as arguments.
Next we define a scrubText method to clean up tokens. The following is the business
logic of the function:
public String scrubText ( String text )
{
return text . trim (). toLowerCase ();
}
This version is relatively simple. In production it would typically have many more cases
handled. Having the business logic defined as a separate method makes it simpler to
write unit tests against.
Next, we define an operate method. This is essentially a wrapper that takes an argument
tuple, applies our scrubText method to each token, and then produces a result tuple:
public void operate ( FlowProcess flowProcess , FunctionCall functionCall )
{
TupleEntry argument = functionCall . getArguments ();
String doc_id = argument . getString ( 0 );
String token = scrubText ( argument . getString ( 1 ) );
if ( token . length () > 0 )
{
Tuple result = new Tuple ();
result . add ( doc_id );
result . add ( token );
functionCall . getOutputCollector (). add ( result );
}
}
Search WWH ::




Custom Search