Database Reference
In-Depth Information
To implement a custom partitioner, you need to subclass the
org.apache.spark.Par
titioner
class and implement three methods:
•
numPartitions: Int
, which returns the number of partitions you will create.
•
getPartition(key: Any): Int
, which returns the partition ID (0 to
numPartitions-1
) for a given key.
•
equals()
, the standard Java equality method. This is important to implement
because Spark will need to test your
Partitioner
object against other instances
of itself when it decides whether two of your RDDs are partitioned the same way!
One gotcha is that if you rely on Java's
hashCode()
method in your algorithm, it can
return negative numbers. You need to be careful to ensure that
getPartition()
always returns a nonnegative result.
Example 4-26
shows how we would write the domain-name-based partitioner
sketched previously, which hashes only the domain name of each URL.
Example 4-26. Scala custom partitioner
class
DomainNamePartitioner
(
numParts
:
Int
)
extends
Partitioner
{
override
def
numPartitions
:
Int
=
numParts
override
def
getPartition
(
key
:
Any
)
:
Int
=
{
val
domain
=
new
Java
.
net
.
URL
(
key
.
toString
).
getHost
()
val
code
=
(
domain
.
hashCode
%
numPartitions
)
if
(
code
<
0
)
{
code
+
numPartitions
// Make it non-negative
}
else
{
code
}
}
// Java equals method to let Spark compare our Partitioner objects
override
def
equals
(
other
:
Any
)
:
Boolean
=
other
match
{
case
dnp
:
DomainNamePartitioner
=>
dnp
.
numPartitions
==
numPartitions
case
_
=>
false
}
}
Note that in the
equals()
method, we used Scala's pattern matching operator
(
match
) to test whether
other
is a
DomainNamePartitioner
, and cast it if so; this is the
same as using
instanceof()
in Java.
Using a custom
Partitioner
is easy: just pass it to the
partitionBy()
method. Many
of the shuffle-based methods in Spark, such as
join()
and
groupByKey()
, can also
take an optional
Partitioner
object to control the partitioning of the output.