Database Reference
In-Depth Information
System
.
out
.
println
(
data
.
mapToDouble
(
new
DoubleFunction
<
CassandraRow
>()
{
public
double
call
(
CassandraRow
row
)
{
return
row
.
getInt
(
"value"
);
}
}).
stats
());
In addition to loading an entire table, we can also query subsets of our data. We can
restrict our data by adding a
where
clause to the
cassandraTable()
call—for exam‐
ple,
sc.cassandraTable(…).where("key=?", "panda")
.
The Cassandra connector supports saving to Cassandra from a variety of RDD types.
We can directly save RDDs of
CassandraRow
objects, which is useful for copying data
between tables. We can save RDDs that aren't in row form as tuples and lists by spec‐
ifying the column mapping, as
Example 5-44
shows.
Example 5-44. Saving to Cassandra in Scala
val
rdd
=
sc
.
parallelize
(
List
(
Seq
(
"moremagic"
,
1
)))
rdd
.
saveToCassandra
(
"test"
,
"kv"
,
SomeColumns
(
"key"
,
"value"
))
This section only briefly introduced the Cassandra connector. For more information,
check out the
connector's GitHub page
.
HBase
Spark can access HBase through its Hadoop input format, implemented in the
org.apache.hadoop.hbase.mapreduce.TableInputFormat
class. This input format
returns key/value pairs where the key is of type
org.apache.hadoop.hbase.io.Immu
tableBytesWritable
and the value is of type
org.apache.hadoop.hbase.cli
ent.Result
. The
Result
class includes various methods for getting values based on
their column family, as described in its
API documentation
.
To use Spark with HBase, you can call
SparkContext.newAPIHadoopRDD
with the cor‐
rect input format, as shown for Scala in
Example 5-45
.
Example 5-45. Scala example of reading from HBase
import
org.apache.hadoop.hbase.HBaseConfiguration
import
org.apache.hadoop.hbase.client.Result
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
val
conf
=
HBaseConfiguration
.
create
()
conf
.
set
(
TableInputFormat
.
INPUT_TABLE
,
"tablename"
)
// which table to scan
val
rdd
=
sc
.
newAPIHadoopRDD
(
conf
,
classOf
[
TableInputFormat
],
classOf
[
ImmutableBytesWritable
],
classOf
[
Result
])