Database Reference
In-Depth Information
public static class FirstPartitioner
extends Partitioner < IntPair , NullWritable > {
@Override
public int getPartition ( IntPair key , NullWritable value , int
numPartitions ) {
// multiply by 127 to perform some mixing
return Math . abs ( key . getFirst () * 127 ) % numPartitions ;
}
}
public static class KeyComparator extends WritableComparator {
protected KeyComparator () {
super ( IntPair . class , true );
}
@Override
public int compare ( WritableComparable w1 , WritableComparable w2 ) {
IntPair ip1 = ( IntPair ) w1 ;
IntPair ip2 = ( IntPair ) w2 ;
int cmp = IntPair . compare ( ip1 . getFirst (), ip2 . getFirst ());
if ( cmp != 0 ) {
return cmp ;
}
return - IntPair . compare ( ip1 . getSecond (), ip2 . getSecond ());
//reverse
}
}
public static class GroupComparator extends WritableComparator {
protected GroupComparator () {
super ( IntPair . class , true );
}
@Override
public int compare ( WritableComparable w1 , WritableComparable w2 ) {
IntPair ip1 = ( IntPair ) w1 ;
IntPair ip2 = ( IntPair ) w2 ;
return IntPair . compare ( ip1 . getFirst (), ip2 . getFirst ());
}
}
@Override
public int run ( String [] args ) throws Exception {
Job job = JobBuilder . parseInputAndOutput ( this , getConf (), args );
if ( job == null ) {
return - 1 ;
Search WWH ::




Custom Search