Database Reference
In-Depth Information
Example 24-3. Extending word count and sort with a SubAssembly
Scheme sourceScheme
=
new
TextLine
(
new
Fields
(
"line"
));
Tap source
=
new
Hfs
(
sourceScheme
,
inputPath
);
Scheme sinkScheme
=
new
TextLine
(
new
Fields
(
"word"
,
"count"
));
Tap sink
=
new
Hfs
(
sinkScheme
,
outputPath
,
SinkMode
.
REPLACE
);
Pipe assembly
=
new
Pipe
(
"wordcount"
);
assembly
=
new
ParseWordsAssembly
(
assembly
);
assembly
=
new
GroupBy
(
assembly
,
new
Fields
(
"word"
));
Aggregator count
=
new
Count
(
new
Fields
(
"count"
));
assembly
=
new
Every
(
assembly
,
count
);
assembly
=
new
GroupBy
(
assembly
,
new
Fields
(
"count"
),
new
Fields
(
"word"
));
FlowConnector flowConnector
=
new
FlowConnector
();
Flow flow
=
flowConnector
.
connect
(
"word-count"
,
source
,
sink
,
assembly
);
flow
.
complete
();
We replace
Each
from the previous example with our
ParseWordsAssembly
pipe.
Finally, we just substitute in our new
SubAssembly
right where the previous
Every
and word parser function were used in the previous example. This nesting can continue as
deep as necessary.