Database Reference
In-Depth Information
@Override
public void write(IEtlKey arg0, CamusWrapper
arg1)
throws IOException, InterruptedException {
key.set(arg0.getTopic()+"\t"+arg0.getPartition()+
"\t"+arg0.getOffset());
if (arg1.getRecord() instanceof ObjectNode) {
ObjectNode node =
(ObjectNode)arg1.getRecord();
value.set(node.toString());
}
}
};
}
@Override
public String getFilenameExtension() {
return ".json";
}
}
It is activated by updating camus.properties to use the appropriate
class:
etl.record.writer.provider.class=wiley.streaming.camus.
JSONRecordWriterProvider
Ingesting Data from Flume
Flume natively supports the Hadoop Distributed File System (HDFS) data
ingestion as one of its sinks. For example, writing data in the same directory
format as the earlier Camus example using an agent named agent99 is
implemented as follows:
agent99.channels = channel1
agent99.sinks = kitchen
agent99.sinks.kitchen.type = hdfs
agent99.sinks.channel = channel1
agent99.kitchen.hdfs.path = /events/%y%m%d%H%M
agent99.kitchen.hdfs.round = true
Search WWH ::




Custom Search