Database Reference
In-Depth Information
","
+
call
.
contactlat
+
","
+
call
.
contactlong
);
}
return
latLons
;
}
});
JavaRDD
<
String
>
distances
=
pipeInputs
.
pipe
(
SparkFiles
.
get
(
distScriptName
));
System
.
out
.
println
(
StringUtils
.
join
(
distances
.
collect
(),
","
));
With
SparkContext.addFile(path)
, we can build up a list of files for each of the
worker nodes to download with a Spark job. These files can come from the driver's
local filesystem (as we did in these examples), from HDFS or other Hadoop-
supported filesystems, or from an HTTP, HTTPS, or FTP URI. When an action is
run in the job, the files will be downloaded by each of the nodes. The files can then be
found on the worker nodes in
SparkFiles.getRootDirectory
, or located with
Spark
Files.get(filename)
. Of course, this is only one way to make sure that
pipe()
can
find a script on each worker node. You could use another remote copying tool to
place the script file in a knowable location on each node.
All the files added with
SparkContext.addFile(path)
are stored
in the same directory, so it's important to use unique names.
Once the script is available, the
pipe()
method on RDDs makes it easy to pipe the
elements of an RDD through the script. Perhaps a smarter version of
findDistance
would accept
SEPARATOR
as a command-line argument. In that case, either of these
would do the job, although the first is preferred:
•
rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
•
rdd.pipe(SparkFiles.get("finddistance.R") + " ,")
In the first option, we are passing the command invocation as a sequence of posi‐
tional arguments (with the command itself at the zero-offset position); in the second,
we're passing it as a single command string that Spark will then break down into
positional arguments.
We can also specify shell environment variables with
pipe()
if we desire. Simply pass
in a map of environment variables to values as the second parameter to
pipe()
, and
Spark will set those values.
You should now at least have an understanding of how to use
pipe()
to process the
elements of an RDD through an external command, and of how to distribute such
command scripts to the cluster in a way that the worker nodes can find them.