Database Reference
In-Depth Information
nodes and to actually transform our RDD with our shell script. Both tasks are easy to
accomplish in Spark, as you can see in Examples
6-16
through
6-18
.
Example 6-16. Driver program using pipe() to call finddistance.R in Python
# Compute the distance of each call using an external R program
distScript
=
"./src/R/finddistance.R"
distScriptName
=
"finddistance.R"
sc
.
addFile
(
distScript
)
def
hasDistInfo
(
call
):
"""Verify that a call has the fields required to compute the distance"""
requiredFields
=
[
"mylat"
,
"mylong"
,
"contactlat"
,
"contactlong"
]
return
all
(
map
(
lambda
f
:
call
[
f
],
requiredFields
))
def
formatCall
(
call
):
"""Format a call so that it can be parsed by our R program"""
return
"{0},{1},{2},{3}"
.
format
(
call
[
"mylat"
],
call
[
"mylong"
],
call
[
"contactlat"
],
call
[
"contactlong"
])
pipeInputs
=
contactsContactList
.
values
()
.
flatMap
(
lambda
calls
:
map
(
formatCall
,
filter
(
hasDistInfo
,
calls
)))
distances
=
pipeInputs
.
pipe
(
SparkFiles
.
get
(
distScriptName
))
print
distances
.
collect
()
Example 6-17. Driver program using pipe() to call finddistance.R in Scala
// Compute the distance of each call using an external R program
// adds our script to a list of files for each node to download with this job
val
distScript
=
"./src/R/finddistance.R"
val
distScriptName
=
"finddistance.R"
sc
.
addFile
(
distScript
)
val
distances
=
contactsContactLists
.
values
.
flatMap
(
x
=>
x
.
map
(
y
=>
s"
$y
.contactlay,
$y
.contactlong,
$y
.mylat,
$y
.mylong"
)).
pipe
(
Seq
(
SparkFiles
.
get
(
distScriptName
)))
println
(
distances
.
collect
().
toList
)
Example 6-18. Driver program using pipe() to call finddistance.R in Java
// Compute the distance of each call using an external R program
// adds our script to a list of files for each node to download with this job
String
distScript
=
"./src/R/finddistance.R"
;
String
distScriptName
=
"finddistance.R"
;
sc
.
addFile
(
distScript
);
JavaRDD
<
String
>
pipeInputs
=
contactsContactLists
.
values
()
.
map
(
new
VerifyCallLogs
()).
flatMap
(
new
FlatMapFunction
<
CallLog
[],
String
>()
{
public
Iterable
<
String
>
call
(
CallLog
[]
calls
)
{
ArrayList
<
String
>
latLons
=
new
ArrayList
<
String
>();
for
(
CallLog
call:
calls
)
{
latLons
.
add
(
call
.
mylat
+
","
+
call
.
mylong
+