Database Reference
In-Depth Information
job
.
getConfiguration
().
setBoolean
(
Job
.
MAPREDUCE_JOB_USER_CLASSPATH_FIRST
,
true
);
FileInputFormat
.
addInputPath
(
job
,
new
Path
(
input
));
FileOutputFormat
.
setOutputPath
(
job
,
new
Path
(
output
));
AvroJob
.
setDataModelClass
(
job
,
GenericData
.
class
);
Schema schema
=
new
Schema
.
Parser
().
parse
(
new
File
(
schemaFile
));
AvroJob
.
setInputKeySchema
(
job
,
schema
);
AvroJob
.
setMapOutputKeySchema
(
job
,
schema
);
AvroJob
.
setMapOutputValueSchema
(
job
,
schema
);
AvroJob
.
setOutputKeySchema
(
job
,
schema
);
job
.
setInputFormatClass
(
AvroKeyInputFormat
.
class
);
job
.
setOutputFormatClass
(
AvroKeyOutputFormat
.
class
);
job
.
setOutputKeyClass
(
AvroKey
.
class
);
job
.
setOutputValueClass
(
NullWritable
.
class
);
job
.
setMapperClass
(
SortMapper
.
class
);
job
.
setReducerClass
(
SortReducer
.
class
);
return
job
.
waitForCompletion
(
true
) ?
0
:
1
;
}
public static
void
main
(
String
[]
args
)
throws
Exception
{
int
exitCode
=
ToolRunner
.
run
(
new
AvroSort
(),
args
);
System
.
exit
(
exitCode
);
}
}
This program (which uses the Generic Avro mapping and hence does not require any code
generation) can sort Avro records of any type, represented in Java by the generic type
parameter
K
. We choose a value that is the same as the key, so that when the values are
grouped by key we can emit all of the values in the case that more than one of them share
the same key (according to the sorting function). This means we don't lose any re-
roValue
. The reducer acts as an identity, passing the values through as output keys,
which will get written to an Avro datafile.
The sorting happens in the MapReduce shuffle, and the sort function is determined by the
Avro schema that is passed to the program. Let's use the program to sort the
pairs.avro