Database Reference
In-Depth Information
public class JSONMessageDecoder
extends MessageDecoder< byte [], ObjectNode> {
private final ObjectMapper mapper = new
ObjectMapper();
@Override
public CamusWrapper<ObjectNode> decode( byte [] arg0) {
ObjectNode json;
try {
json = mapper.readValue( new String(arg0),
ObjectNode. class );
} catch (IOException e) {
throw new MessageDecoderException("Unable to
parse event");
}
return new CamusWrapper<ObjectNode>(json,
json.get("ts").getValueAsLong());
}
}
To use this class, ensure that the camus.message.decoder.class
property is set properly:
# Concrete implementation of the Decoder class to use
camus.message.decoder.class=wiley.streaming.camus.JSONMessageDecoder
Now that the data has been safely decoded it must be written out to files. In
this case, a SequenceFile will be used. Because these files offer space for
both a key and a value, the key will contain information about the event's
position in Kafka: the topic, partition, and offset. The value will be the JSON
value re-encoded to text for further processing:
public class JSONRecordWriter implements
RecordWriterProvider {
@Override
public RecordWriter<IEtlKey, CamusWrapper>
getDataRecordWriter(
TaskAttemptContext context, String filename,
CamusWrapper arg2,
FileOutputCommitter comitter) throws IOException,
Search WWH ::




Custom Search