Database Reference
In-Depth Information
signs . map { sign =>
createExchangeForSign ( sign )
// fetch responses
}. map { case ( sign , exchange ) =>
( sign , readExchangeCallLog ( mapper , exchange ))
}. filter ( x => x . _2 != null ) // Remove empty CallLogs
}
Example 6-12. Shared connection pool and JSON parser in Java
// Use mapPartitions to reuse setup work.
JavaPairRDD < String , CallLog []> contactsContactLists =
validCallSigns . mapPartitionsToPair (
new PairFlatMapFunction < Iterator < String >, String , CallLog []>() {
public Iterable < Tuple2 < String , CallLog []>> call ( Iterator < String > input ) {
// List for our results.
ArrayList < Tuple2 < String , CallLog []>> callsignLogs = new ArrayList <>();
ArrayList < Tuple2 < String , ContentExchange >> requests = new ArrayList <>();
ObjectMapper mapper = createMapper ();
HttpClient client = new HttpClient ();
try {
client . start ();
while ( input . hasNext ()) {
requests . add ( createRequestForSign ( input . next (), client ));
}
for ( Tuple2 < String , ContentExchange > signExchange : requests ) {
callsignLogs . add ( fetchResultFromRequest ( mapper , signExchange ));
}
} catch ( Exception e ) {
}
return callsignLogs ;
}});
System . out . println ( StringUtils . join ( contactsContactLists . collect (), "," ));
When operating on a per-partition basis, Spark gives our function an Iterator of the
elements in that partition. To return values, we return an Iterable . In addition to
mapPartitions() , Spark has a number of other per-partition operators, listed in
Table 6-1 .
Table 6-1. Per-partition operators
Function name
We are called with
We return
Function signature on RDD[T]
Iterator of the elements in
that partition
Iterator of our
return elements
mapPartitions()
f: (Iterator[T]) →
Iterator[U]
Integer of partition number,
and Iterator of the elements in
that partition
Iterator of our
return elements
mapPartitionsWithIn
dex()
f: (Int, Itera
tor[T]) → Itera
tor[U]
Search WWH ::




Custom Search