Database Reference
In-Depth Information
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
4.
Next, we need to provide a Cassandra-specific configuration for out-
put mapping:
ConfigHelper.setOutputRpcPort(job.getConfiguration(),
"9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
"localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
KEYSPACE_NAME, COLUMN_FAMILY);
Note You can also change the rpc port and initial address in case
you are not running on a localhost.
5.
Finally, we need to provide input and output paths:
FileInputFormat.addInputPath(job, new
Path(otherArgs[0]));
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
Here otherArgs[0] is the input path for the HDFS tweet file (e.g., /apress/
tweetdata ).
Let's have a look at the mapper ( TweetTokenizer ):
public class TweetMapper
{
public static class TweetTokenizer extends
Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable one = new
IntWritable(1);
Search WWH ::




Custom Search