Database Reference
In-Depth Information
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
public class CascadingSimpleJoinPipe {
public static void main(String[] args) {
// Collect input fields from parameters
String websales = args[0]; // Web sales input
String users = args[1]; // User information
String output_dir = args[2]; // The output directory
// Create a flow connector for this class
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties,
CascadingSimpleJoinPipe.class);
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
// The input tap for our Web sales information
Fields websalesFields = new Fields("user_id", "item",
"date_purchase", "price");
Tap websalesTap = new Hfs(new TextDelimited(websalesFields,","),
websales);
Pipe salesPipe = new Pipe("Websales Pipe");
// The input tap for our user info
Fields userFields = new Fields("user_id", "name",
"date_joined", "state");
Tap usersTap = new Hfs(new TextDelimited(userFields,","), users);
Pipe usersPipe = new Pipe("Users Pipe");
// Join the two streams on the field "user_id"
Fields joinOn = new Fields("user_id");
Fields outputFields = new Fields("user_id", "item",
"date_purchase","price",
"user_id1", "name",
"date_joined", "state");
// This pipe takes the information about our streams above
// and joins the stream via the CoGroup Class
Pipe joinPipe = new CoGroup(salesPipe, joinOn,
usersPipe, joinOn,
outputFields,new InnerJoin());
// An output tap (comma delimited CSV file)
// and the pipe that will feed it
 
Search WWH ::




Custom Search