Database Reference
In-Depth Information
Example 24-3. Extending word count and sort with a SubAssembly
Scheme sourceScheme = new TextLine ( new Fields ( "line" ));
Tap source = new Hfs ( sourceScheme , inputPath );
Scheme sinkScheme = new TextLine ( new Fields ( "word" , "count" ));
Tap sink = new Hfs ( sinkScheme , outputPath , SinkMode . REPLACE );
Pipe assembly = new Pipe ( "wordcount" );
assembly =
new ParseWordsAssembly ( assembly );
assembly = new GroupBy ( assembly , new Fields ( "word" ));
Aggregator count = new Count ( new Fields ( "count" ));
assembly = new Every ( assembly , count );
assembly = new GroupBy ( assembly , new Fields ( "count" ), new
Fields ( "word" ));
FlowConnector flowConnector = new FlowConnector ();
Flow flow = flowConnector . connect ( "word-count" , source , sink , assembly );
flow . complete ();
We replace Each from the previous example with our ParseWordsAssembly pipe.
Finally, we just substitute in our new SubAssembly right where the previous Every
and word parser function were used in the previous example. This nesting can continue as
deep as necessary.
 
 
Search WWH ::




Custom Search