Database Reference
In-Depth Information
public static class
FirstPartitioner
extends
Partitioner
<
IntPair
,
NullWritable
> {
@Override
public
int
getPartition
(
IntPair key
,
NullWritable value
,
int
numPartitions
) {
// multiply by 127 to perform some mixing
return
Math
.
abs
(
key
.
getFirst
() *
127
) %
numPartitions
;
}
}
public static class
KeyComparator
extends
WritableComparator
{
protected
KeyComparator
() {
super
(
IntPair
.
class
,
true
);
}
@Override
public
int
compare
(
WritableComparable w1
,
WritableComparable w2
) {
IntPair ip1
= (
IntPair
)
w1
;
IntPair ip2
= (
IntPair
)
w2
;
int
cmp
=
IntPair
.
compare
(
ip1
.
getFirst
(),
ip2
.
getFirst
());
if
(
cmp
!=
0
) {
return
cmp
;
}
return
-
IntPair
.
compare
(
ip1
.
getSecond
(),
ip2
.
getSecond
());
//reverse
}
}
public static class
GroupComparator
extends
WritableComparator
{
protected
GroupComparator
() {
super
(
IntPair
.
class
,
true
);
}
@Override
public
int
compare
(
WritableComparable w1
,
WritableComparable w2
) {
IntPair ip1
= (
IntPair
)
w1
;
IntPair ip2
= (
IntPair
)
w2
;
return
IntPair
.
compare
(
ip1
.
getFirst
(),
ip2
.
getFirst
());
}
}
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
Job job
=
JobBuilder
.
parseInputAndOutput
(
this
,
getConf
(),
args
);
if
(
job
==
null
) {
return
-
1
;