Database Reference
In-Depth Information
System . out . println ( data . mapToDouble ( new DoubleFunction < CassandraRow >() {
public double call ( CassandraRow row ) { return row . getInt ( "value" ); }
}). stats ());
In addition to loading an entire table, we can also query subsets of our data. We can
restrict our data by adding a where clause to the cassandraTable() call—for exam‐
ple, sc.cassandraTable(…).where("key=?", "panda") .
The Cassandra connector supports saving to Cassandra from a variety of RDD types.
We can directly save RDDs of CassandraRow objects, which is useful for copying data
between tables. We can save RDDs that aren't in row form as tuples and lists by spec‐
ifying the column mapping, as Example 5-44 shows.
Example 5-44. Saving to Cassandra in Scala
val rdd = sc . parallelize ( List ( Seq ( "moremagic" , 1 )))
rdd . saveToCassandra ( "test" , "kv" , SomeColumns ( "key" , "value" ))
This section only briefly introduced the Cassandra connector. For more information,
check out the connector's GitHub page .
HBase
Spark can access HBase through its Hadoop input format, implemented in the
org.apache.hadoop.hbase.mapreduce.TableInputFormat class. This input format
returns key/value pairs where the key is of type org.apache.hadoop.hbase.io.Immu
tableBytesWritable and the value is of type org.apache.hadoop.hbase.cli
ent.Result . The Result class includes various methods for getting values based on
their column family, as described in its API documentation .
To use Spark with HBase, you can call SparkContext.newAPIHadoopRDD with the cor‐
rect input format, as shown for Scala in Example 5-45 .
Example 5-45. Scala example of reading from HBase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration . create ()
conf . set ( TableInputFormat . INPUT_TABLE , "tablename" ) // which table to scan
val rdd = sc . newAPIHadoopRDD (
conf , classOf [ TableInputFormat ], classOf [ ImmutableBytesWritable ], classOf [ Result ])
Search WWH ::




Custom Search