Database Reference
In-Depth Information
job.setJarByClass(getClass());
// Anything you set in conf will be available to Mapper
and Reducer
Configuration conf = job.getConfiguration();
// set mapper and reducer
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
// Cassandra Specific settings for ingesting CF
ConfigHelper.setInputInitialAddress(conf,
Setup.CASSANDRA_HOST_ADDR);
ConfigHelper.setInputRpcPort(conf,
String.valueOf(Setup.CASSANDRA_RPC_PORT));
ConfigHelper.setInputPartitioner(conf,
Murmur3Partitioner.class.getName());
ConfigHelper.setInputColumnFamily(conf, Setup.KEYSPACE,
Setup.INPUT_CF);
SliceRangesliceRange = new SliceRange(
ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""),
false,
Integer.MAX_VALUE);
SlicePredicate predicate = new SlicePredicate()
.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(conf, predicate);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
// Cassandra specific output setting
ConfigHelper.setOutputInitialAddress(conf,
Setup.CASSANDRA_HOST_ADDR);
ConfigHelper.setOutputRpcPort(conf,
String.valueOf(Setup.CASSANDRA_RPC_PORT));
ConfigHelper.setOutputPartitioner(conf,
Murmur3Partitioner.class.getName());
Search WWH ::




Custom Search