Database Reference
In-Depth Information
To implement a custom partitioner, you need to subclass the org.apache.spark.Par
titioner class and implement three methods:
numPartitions: Int , which returns the number of partitions you will create.
getPartition(key: Any): Int , which returns the partition ID (0 to
numPartitions-1 ) for a given key.
equals() , the standard Java equality method. This is important to implement
because Spark will need to test your Partitioner object against other instances
of itself when it decides whether two of your RDDs are partitioned the same way!
One gotcha is that if you rely on Java's hashCode() method in your algorithm, it can
return negative numbers. You need to be careful to ensure that getPartition()
always returns a nonnegative result.
Example 4-26 shows how we would write the domain-name-based partitioner
sketched previously, which hashes only the domain name of each URL.
Example 4-26. Scala custom partitioner
class DomainNamePartitioner ( numParts : Int ) extends Partitioner {
override def numPartitions : Int = numParts
override def getPartition ( key : Any ) : Int = {
val domain = new Java . net . URL ( key . toString ). getHost ()
val code = ( domain . hashCode % numPartitions )
if ( code < 0 ) {
code + numPartitions // Make it non-negative
} else {
code
}
}
// Java equals method to let Spark compare our Partitioner objects
override def equals ( other : Any ) : Boolean = other match {
case dnp : DomainNamePartitioner =>
dnp . numPartitions == numPartitions
case _ =>
false
}
}
Note that in the equals() method, we used Scala's pattern matching operator
( match ) to test whether other is a DomainNamePartitioner , and cast it if so; this is the
same as using instanceof() in Java.
Using a custom Partitioner is easy: just pass it to the partitionBy() method. Many
of the shuffle-based methods in Spark, such as join() and groupByKey() , can also
take an optional Partitioner object to control the partitioning of the output.
Search WWH ::




Custom Search