Database Reference
In-Depth Information
Example 4-24. Determining partitioner of an RDD
scala
>
val
pairs
=
sc
.
parallelize
(
List
((
1
,
1
),
(
2
,
2
),
(
3
,
3
)))
pairs
:
spark.RDD
[(
Int
,
Int
)]
=
ParallelCollectionRDD
[
0
]
at
parallelize
at
<
console
>:
12
scala
>
pairs
.
partitioner
res0
:
Option
[
spark.Partitioner
]
=
None
scala
>
val
partitioned
=
pairs
.
partitionBy
(
new
spark
.
HashPartitioner
(
2
))
partitioned
:
spark.RDD
[(
Int
,
Int
)]
=
ShuffledRDD
[
1
]
at
partitionBy
at
<
console
>:
14
scala
>
partitioned
.
partitioner
res1
:
Option
[
spark.Partitioner
]
=
Some
(
spark
.
HashPartitioner
@
5147788
d
)
In this short session, we created an RDD of
(Int, Int)
pairs, which initially have no
partitioning information (an
Option
with value
None
). We then created a second
RDD by hash-partitioning the first. If we actually wanted to use
partitioned
in fur‐
ther operations, then we should have appended
persist()
to the third line of input,
in which
partitioned
is defined. This is for the same reason that we needed
per
sist()
for
userData
in the previous example: without
persist()
, subsequent RDD
actions will evaluate the entire lineage of
partitioned
, which will cause
pairs
to be
hash-partitioned over and over.
Operations That Benefit from Partitioning
Many of Spark's operations involve shuffling data by key across the network. All of
these will benefit from partitioning. As of Spark 1.0, the operations that benefit from
partitioning are
cogroup()
,
groupWith()
,
join()
,
leftOuterJoin()
,
rightOuter
Join()
,
groupByKey()
,
reduceByKey()
,
combineByKey()
, and
lookup()
.
For operations that act on a single RDD, such as
reduceByKey()
, running on a pre-
partitioned RDD will cause all the values for each key to be computed
locally
on a
single machine, requiring only the final, locally reduced value to be sent from each
worker node back to the master. For binary operations, such as
cogroup()
and
join()
, pre-partitioning will cause at least one of the RDDs (the one with the known
partitioner) to not be shuffled. If both RDDs have the
same
partitioner, and if they
are cached on the same machines (e.g., one was created using
mapValues()
on the
other, which preserves keys and partitioning) or if one of them has not yet been com‐
puted, then no shuffling across the network will occur.
Operations That Affect Partitioning
Spark knows internally how each of its operations affects partitioning, and automati‐
cally sets the
partitioner
on RDDs created by operations that partition the data. For
example, suppose you called
join()
to join two RDDs; because the elements with the
same key have been hashed to the same machine, Spark knows that the result is