Database Reference
In-Depth Information
signs
.
map
{
sign
=>
createExchangeForSign
(
sign
)
// fetch responses
}.
map
{
case
(
sign
,
exchange
)
=>
(
sign
,
readExchangeCallLog
(
mapper
,
exchange
))
}.
filter
(
x
=>
x
.
_2
!=
null
)
// Remove empty CallLogs
}
Example 6-12. Shared connection pool and JSON parser in Java
// Use mapPartitions to reuse setup work.
JavaPairRDD
<
String
,
CallLog
[]>
contactsContactLists
=
validCallSigns
.
mapPartitionsToPair
(
new
PairFlatMapFunction
<
Iterator
<
String
>,
String
,
CallLog
[]>()
{
public
Iterable
<
Tuple2
<
String
,
CallLog
[]>>
call
(
Iterator
<
String
>
input
)
{
// List for our results.
ArrayList
<
Tuple2
<
String
,
CallLog
[]>>
callsignLogs
=
new
ArrayList
<>();
ArrayList
<
Tuple2
<
String
,
ContentExchange
>>
requests
=
new
ArrayList
<>();
ObjectMapper
mapper
=
createMapper
();
HttpClient
client
=
new
HttpClient
();
try
{
client
.
start
();
while
(
input
.
hasNext
())
{
requests
.
add
(
createRequestForSign
(
input
.
next
(),
client
));
}
for
(
Tuple2
<
String
,
ContentExchange
>
signExchange
:
requests
)
{
callsignLogs
.
add
(
fetchResultFromRequest
(
mapper
,
signExchange
));
}
}
catch
(
Exception
e
)
{
}
return
callsignLogs
;
}});
System
.
out
.
println
(
StringUtils
.
join
(
contactsContactLists
.
collect
(),
","
));
When operating on a per-partition basis, Spark gives our function an
Iterator
of the
elements in that partition. To return values, we return an
Iterable
. In addition to
mapPartitions()
, Spark has a number of other per-partition operators, listed in
Table 6-1
.
Table 6-1. Per-partition operators
Function name
We are called with
We return
Function signature on RDD[T]
Iterator of the elements in
that partition
Iterator of our
return elements
mapPartitions()
f: (Iterator[T]) →
Iterator[U]
Integer of partition number,
and Iterator of the elements in
that partition
Iterator of our
return elements
mapPartitionsWithIn
dex()
f: (Int, Itera
tor[T]) → Itera
tor[U]