Database Reference
In-Depth Information
nodes and to actually transform our RDD with our shell script. Both tasks are easy to
accomplish in Spark, as you can see in Examples 6-16 through 6-18 .
Example 6-16. Driver program using pipe() to call finddistance.R in Python
# Compute the distance of each call using an external R program
distScript = "./src/R/finddistance.R"
distScriptName = "finddistance.R"
sc . addFile ( distScript )
def hasDistInfo ( call ):
"""Verify that a call has the fields required to compute the distance"""
requiredFields = [ "mylat" , "mylong" , "contactlat" , "contactlong" ]
return all ( map ( lambda f : call [ f ], requiredFields ))
def formatCall ( call ):
"""Format a call so that it can be parsed by our R program"""
return "{0},{1},{2},{3}" . format (
call [ "mylat" ], call [ "mylong" ],
call [ "contactlat" ], call [ "contactlong" ])
pipeInputs = contactsContactList . values () . flatMap (
lambda calls : map ( formatCall , filter ( hasDistInfo , calls )))
distances = pipeInputs . pipe ( SparkFiles . get ( distScriptName ))
print distances . collect ()
Example 6-17. Driver program using pipe() to call finddistance.R in Scala
// Compute the distance of each call using an external R program
// adds our script to a list of files for each node to download with this job
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc . addFile ( distScript )
val distances = contactsContactLists . values . flatMap ( x => x . map ( y =>
s" $y .contactlay, $y .contactlong, $y .mylat, $y .mylong" )). pipe ( Seq (
SparkFiles . get ( distScriptName )))
println ( distances . collect (). toList )
Example 6-18. Driver program using pipe() to call finddistance.R in Java
// Compute the distance of each call using an external R program
// adds our script to a list of files for each node to download with this job
String distScript = "./src/R/finddistance.R" ;
String distScriptName = "finddistance.R" ;
sc . addFile ( distScript );
JavaRDD < String > pipeInputs = contactsContactLists . values ()
. map ( new VerifyCallLogs ()). flatMap (
new FlatMapFunction < CallLog [], String >() {
public Iterable < String > call ( CallLog [] calls ) {
ArrayList < String > latLons = new ArrayList < String >();
for ( CallLog call: calls ) {
latLons . add ( call . mylat + "," + call . mylong +
Search WWH ::




Custom Search