Database Reference
In-Depth Information
"," + call . contactlat + "," + call . contactlong );
}
return latLons ;
}
});
JavaRDD < String > distances = pipeInputs . pipe ( SparkFiles . get ( distScriptName ));
System . out . println ( StringUtils . join ( distances . collect (), "," ));
With SparkContext.addFile(path) , we can build up a list of files for each of the
worker nodes to download with a Spark job. These files can come from the driver's
local filesystem (as we did in these examples), from HDFS or other Hadoop-
supported filesystems, or from an HTTP, HTTPS, or FTP URI. When an action is
run in the job, the files will be downloaded by each of the nodes. The files can then be
found on the worker nodes in SparkFiles.getRootDirectory , or located with Spark
Files.get(filename) . Of course, this is only one way to make sure that pipe() can
find a script on each worker node. You could use another remote copying tool to
place the script file in a knowable location on each node.
All the files added with SparkContext.addFile(path) are stored
in the same directory, so it's important to use unique names.
Once the script is available, the pipe() method on RDDs makes it easy to pipe the
elements of an RDD through the script. Perhaps a smarter version of findDistance
would accept SEPARATOR as a command-line argument. In that case, either of these
would do the job, although the first is preferred:
rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
rdd.pipe(SparkFiles.get("finddistance.R") + " ,")
In the first option, we are passing the command invocation as a sequence of posi‐
tional arguments (with the command itself at the zero-offset position); in the second,
we're passing it as a single command string that Spark will then break down into
positional arguments.
We can also specify shell environment variables with pipe() if we desire. Simply pass
in a map of environment variables to values as the second parameter to pipe() , and
Spark will set those values.
You should now at least have an understanding of how to use pipe() to process the
elements of an RDD through an external command, and of how to distribute such
command scripts to the cluster in a way that the worker nodes can find them.
Search WWH ::




Custom Search