Database Reference
In-Depth Information
implementing your own serialization routines for your data types (e.g., using the
java.io.Externalizable interface for Java Serialization, or using the reduce()
method to define custom serialization for Python's pickle library).
Working on a Per-Partition Basis
Working with data on a per-partition basis allows us to avoid redoing setup work for
each data item. Operations like opening a database connection or creating a random-
number generator are examples of setup steps that we wish to avoid doing for each
element. Spark has per-partition versions of map and foreach to help reduce the cost
of these operations by letting you run code only once for each partition of an RDD.
Going back to our example with call signs, there is an online database of ham radio
call signs we can query for a public list of their logged contacts. By using partition-
based operations, we can share a connection pool to this database to avoid setting up
many connections, and reuse our JSON parser. As Examples 6-10 through 6-12 show,
we use the mapPartitions() function, which gives us an iterator of the elements in
each partition of the input RDD and expects us to return an iterator of our results.
Example 6-10. Shared connection pool in Python
def processCallSigns ( signs ):
"""Lookup call signs using a connection pool"""
# Create a connection pool
http = urllib3 . PoolManager ()
# the URL associated with each call sign record
urls = map ( lambda x : "http://73s.com/qsos/ %s .json" % x , signs )
# create the requests (non-blocking)
requests = map ( lambda x : ( x , http . request ( 'GET' , x )), urls )
# fetch the results
result = map ( lambda x : ( x [ 0 ], json . loads ( x [ 1 ] . data )), requests )
# remove any empty results and return
return filter ( lambda x : x [ 1 ] is not None , result )
def fetchCallSigns ( input ):
"""Fetch call signs"""
return input . mapPartitions ( lambda callSigns : processCallSigns ( callSigns ))
contactsContactList = fetchCallSigns ( validSigns )
Example 6-11. Shared connection pool and JSON parser in Scala
val contactsContactLists = validSigns . distinct (). mapPartitions {
signs =>
val mapper = createMapper ()
val client = new HttpClient ()
client . start ()
// create http request
Search WWH ::




Custom Search