Database Reference
In-Depth Information
public class
JSONMessageDecoder
extends
MessageDecoder<
byte
[], ObjectNode> {
private final
ObjectMapper mapper =
new
ObjectMapper();
@Override
public
CamusWrapper<ObjectNode> decode(
byte
[] arg0) {
ObjectNode json;
try
{
json = mapper.readValue(
new
String(arg0),
ObjectNode.
class
);
}
catch
(IOException e) {
throw new
MessageDecoderException("Unable to
parse event");
}
return new
CamusWrapper<ObjectNode>(json,
json.get("ts").getValueAsLong());
}
}
To use this class, ensure that the
camus.message.decoder.class
property is set properly:
# Concrete implementation of the Decoder class to use
camus.message.decoder.class=wiley.streaming.camus.JSONMessageDecoder
Now that the data has been safely decoded it must be written out to files. In
this case, a
SequenceFile
will be used. Because these files offer space for
both a key and a value, the key will contain information about the event's
position in Kafka: the topic, partition, and offset. The value will be the JSON
value re-encoded to text for further processing:
public class
JSONRecordWriter
implements
RecordWriterProvider {
@Override
public
RecordWriter<IEtlKey, CamusWrapper>
getDataRecordWriter(
TaskAttemptContext context, String filename,
CamusWrapper arg2,
FileOutputCommitter comitter)
throws
IOException,