Database Reference
In-Depth Information
InterruptedException {
Configuration conf = context.getConfiguration();
Path file =
new
Path(comitter.getWorkPath(),
EtlMultiOutputFormat.
getUniqueFile
(context,
filename,
getFilenameExtension()));
CompressionCodec codec =
null
;
SequenceFile.CompressionType type =
SequenceFile.CompressionType.
NONE
;
if
(FileOutputFormat.
getCompressOutput
(context)) {
type =
SequenceFileOutputFormat.
getOutputCompressionType
(
context );
Class<?> klass =
SequenceFileOutputFormat.
getOutputCompressorClass
(context,
DefaultCodec.
class
);
codec = (CompressionCodec)
ReflectionUtils.
newInstance
(klass,
conf);
}
final
SequenceFile.Writer writer =
SequenceFile.
createWriter
(
file.getFileSystem(conf),
conf,
file,
Text.
class
, Text.
class
,
type,
codec);
return new
RecordWriter<IEtlKey,CamusWrapper>() {
Text key =
new
Text();
Text value =
new
Text();
@Override
public void
close(TaskAttemptContext arg0)
throws
IOException,
InterruptedException {
writer.close();
}