Database Reference
In-Depth Information
We can use a similar approach in Examples
4-9
through
4-11
to also implement the
classic distributed word count problem. We will use
flatMap()
from the previous
chapter so that we can produce a pair RDD of words and the number 1 and then sum
together all of the words using
reduceByKey()
as in Examples
4-7
and
4-8
.
Example 4-9. Word count in Python
rdd
=
sc
.
textFile
(
"s3://..."
)
words
=
rdd
.
flatMap
(
lambda
x
:
x
.
split
(
" "
))
result
=
words
.
map
(
lambda
x
:
(
x
,
1
))
.
reduceByKey
(
lambda
x
,
y
:
x
+
y
)
Example 4-10. Word count in Scala
val
input
=
sc
.
textFile
(
"s3://..."
)
val
words
=
input
.
flatMap
(
x
=>
x
.
split
(
" "
))
val
result
=
words
.
map
(
x
=>
(
x
,
1
)).
reduceByKey
((
x
,
y
)
=>
x
+
y
)
Example 4-11. Word count in Java
JavaRDD
<
String
>
input
=
sc
.
textFile
(
"s3://..."
)
JavaRDD
<
String
>
words
=
rdd
.
flatMap
(
new
FlatMapFunction
<
String
,
String
>()
{
public
Iterable
<
String
>
call
(
String
x
)
{
return
Arrays
.
asList
(
x
.
split
(
" "
));
}
});
JavaPairRDD
<
String
,
Integer
>
result
=
words
.
mapToPair
(
new
PairFunction
<
String
,
String
,
Integer
>()
{
public
Tuple2
<
String
,
Integer
>
call
(
String
x
)
{
return
new
Tuple2
(
x
,
1
);
}
}).
reduceByKey
(
new
Function2
<
Integer
,
Integer
,
Integer
>()
{
public
Integer
call
(
Integer
a
,
Integer
b
)
{
return
a
+
b
;
}
});
We can actually implement word count even faster by using the
countByValue()
function on the first RDD:
input.flatMap(x =>
x.split(" ")).countByValue()
.
combineByKey()
is the most general of the per-key aggregation functions. Most of the
other per-key combiners are implemented using it. Like
aggregate()
,
combineBy
Key()
allows the user to return values that are not the same type as our input data.
To understand
combineByKey()
, it's useful to think of how it handles each element it
processes. As
combineByKey()
goes through the elements in a partition, each element
either has a key it hasn't seen before or has the same key as a previous element.
If it's a new element,
combineByKey()
uses a function we provide, called
create
Combiner()
, to create the initial value for the accumulator on that key. It's important