Databases Reference
In-Depth Information
Starting from the source code directory that you cloned in Git, connect into the
part3
subdirectory. We'll define a new class called
ScrubFunction
as our custom operation,
which subclasses from
BaseOperation
while implementing the
Function
interface:
public
class
ScrubFunction
extends
BaseOperation
implements
Function
{
...
}
Next, we need to define a constructor, which specifies how this function consumes from
the tuple stream:
public
ScrubFunction
(
Fields
fieldDeclaration
)
{
super
(
2
,
fieldDeclaration
);
}
The
fieldDeclaration
parameter declares a list of fields that will be consumed from
the tuple stream. Based on the intended use, we know that the tuple stream will have
two fields at that point,
doc_id
and
token
. We can constrain this class to allow exactly
two fields as the number of arguments. Great, now we know what the new operation
expects as arguments.
Next we define a
scrubText
method to clean up tokens. The following is the business
logic of the function:
public
String
scrubText
(
String
text
)
{
return
text
.
trim
().
toLowerCase
();
}
This version is relatively simple. In production it would typically have many more cases
handled. Having the business logic defined as a separate method makes it simpler to
write unit tests against.
Next, we define an
operate
method. This is essentially a wrapper that takes an argument
tuple, applies our
scrubText
method to each token, and then produces a result tuple:
public
void
operate
(
FlowProcess
flowProcess
,
FunctionCall
functionCall
)
{
TupleEntry
argument
=
functionCall
.
getArguments
();
String
doc_id
=
argument
.
getString
(
0
);
String
token
=
scrubText
(
argument
.
getString
(
1
)
);
if
(
token
.
length
()
>
0
)
{
Tuple
result
=
new
Tuple
();
result
.
add
(
doc_id
);
result
.
add
(
token
);
functionCall
.
getOutputCollector
().
add
(
result
);
}
}