Database Reference
In-Depth Information
@Override
public void
write(IEtlKey arg0, CamusWrapper
arg1)
throws
IOException, InterruptedException {
key.set(arg0.getTopic()+"\t"+arg0.getPartition()+
"\t"+arg0.getOffset());
if
(arg1.getRecord()
instanceof
ObjectNode) {
ObjectNode node =
(ObjectNode)arg1.getRecord();
value.set(node.toString());
}
}
};
}
@Override
public
String getFilenameExtension() {
return
".json";
}
}
It is activated by updating
camus.properties
to use the appropriate
class:
etl.record.writer.provider.class=wiley.streaming.camus.
JSONRecordWriterProvider
Ingesting Data from Flume
Flume natively supports the Hadoop Distributed File System (HDFS) data
ingestion as one of its sinks. For example, writing data in the same directory
format as the earlier Camus example using an agent named agent99 is
implemented as follows:
agent99.channels = channel1
agent99.sinks = kitchen
agent99.sinks.kitchen.type = hdfs
agent99.sinks.channel = channel1
agent99.kitchen.hdfs.path = /events/%y%m%d%H%M
agent99.kitchen.hdfs.round = true