Database Reference
In-Depth Information
Sorting Using Avro MapReduce
In this section, we use Avro's sort capabilities and combine them with MapReduce to write
a program to sort an Avro datafile (
Example 12-3
)
.
Example 12-3. A MapReduce program to sort an Avro datafile
public class
AvroSort
extends
Configured
implements
Tool
{
static class
SortMapper
<
K
>
extends
Mapper
<
AvroKey
<
K
>,
NullWritable
,
AvroKey
<
K
>,
AvroValue
<
K
>> {
@Override
protected
void
map
(
AvroKey
<
K
>
key
,
NullWritable value
,
Context context
)
throws
IOException
,
InterruptedException
{
context
.
write
(
key
,
new
AvroValue
<
K
>(
key
.
datum
()));
}
}
static class
SortReducer
<
K
>
extends
Reducer
<
AvroKey
<
K
>,
AvroValue
<
K
>,
AvroKey
<
K
>,
NullWritable
> {
@Override
protected
void
reduce
(
AvroKey
<
K
>
key
,
Iterable
<
AvroValue
<
K
>>
values
,
Context context
)
throws
IOException
,
InterruptedException
{
for
(
AvroValue
<
K
>
value
:
values
) {
context
.
write
(
new
AvroKey
(
value
.
datum
()),
NullWritable
.
get
());
}
}
}
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
if
(
args
.
length
!=
3
) {
System
.
err
.
printf
(
"Usage: %s [generic options] <input> <output> <schema-file>\n"
,
getClass
().
getSimpleName
());
ToolRunner
.
printGenericCommandUsage
(
System
.
err
);
return
-
1
;
}
String input
=
args
[
0
];
String output
=
args
[
1
];
String schemaFile
=
args
[
2
];
Job job
=
new
Job
(
getConf
(),
"Avro sort"
);
job
.
setJarByClass
(
getClass
());