Database Reference
In-Depth Information
// Cassandra configuration
ConfigHelper.setInputInitialAddress(job.getConfiguration(),
Setup.END_POINT);
ConfigHelper.setInputColumnFamily(job.getConfiguration(),
Setup.KEY_SPACE, Setup.SRC_TABLE);
ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
// Mapper config
job.setMapperClass(StringTokenizerMapper.class);
job.setInputFormatClass(CqlInputFormat.class);
CqlConfigHelper.setInputCql(job.getConfiguration(),
"select * from " + Setup.KEY_SPACE + "." +
Setup.SRC_TABLE +
" where token(id) > ? and token(id) <= ? allow
filtering");
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(),
"100");
// Reducer config
job.setCombinerClass(CassandraReducer.class);
job.setReducerClass(CassandraReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(CqlOutputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
Setup.KEY_SPACE, Setup.OUT_TABLE);
//job.getConfiguration().set("row_key", Setup.OUT_ROWKEY);
String cql = "update " + Setup.KEY_SPACE + "." +
Setup.OUT_TABLE + " "
+ "set word_count = ? ";
CqlConfigHelper.setOutputCql(job.getConfiguration(), cql);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
Setup.END_POINT);
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
Search WWH ::




Custom Search