Database Reference
In-Depth Information
Much like with Elasticsearch, the Cassandra connector reads a job property to deter‐
mine which cluster to connect to. We set the
spark.cassandra.connection.host
to
point to our Cassandra cluster and if we have a username and password we can set
them with
spark.cassandra.auth.username
and
spark.cassandra.auth.password
.
Assuming you have only a single Cassandra cluster to connect to, we can set this up
when we are creating our SparkContext as shown in Examples
5-40
and
5-41
.
Example 5-40. Setting the Cassandra property in Scala
val
conf
=
new
SparkConf
(
true
)
.
set
(
"spark.cassandra.connection.host"
,
"hostname"
)
val
sc
=
new
SparkContext
(
conf
)
Example 5-41. Setting the Cassandra property in Java
SparkConf
conf
=
new
SparkConf
(
true
)
.
set
(
"spark.cassandra.connection.host"
,
cassandraHost
);
JavaSparkContext
sc
=
new
JavaSparkContext
(
sparkMaster
,
"basicquerycassandra"
,
conf
);
The Datastax Cassandra connector uses implicits in Scala to provide some additional
functions on top of the SparkContext and RDDs. Let's import the implicit conver‐
sions and try loading some data (
Example 5-42
).
Example 5-42. Loading the entire table as an RDD with key/value data in Scala
// Implicits that add functions to the SparkContext & RDDs.
import
com.datastax.spark.connector._
// Read entire table as an RDD. Assumes your table test was created as
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
val
data
=
sc
.
cassandraTable
(
"test"
,
"kv"
)
// Print some basic stats on the value field.
data
.
map
(
row
=>
row
.
getInt
(
"value"
)).
stats
()
In Java we don't have implicit conversions, so we need to explicitly convert our
SparkContext and RDDs for this functionality (
Example 5-43
).
Example 5-43. Loading the entire table as an RDD with key/value data in Java
import
com.datastax.spark.connector.CassandraRow
;
import
static
com
.
datastax
.
spark
.
connector
.
CassandraJavaUtil
.
javaFunctions
;
// Read entire table as an RDD. Assumes your table test was created as
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
JavaRDD
<
CassandraRow
>
data
=
javaFunctions
(
sc
).
cassandraTable
(
"test"
,
"kv"
);
// Print some basic stats.