Database Reference
In-Depth Information
To optimize reading from HBase, TableInputFormat includes multiple settings such
as limiting the scan to just one set of columns and limiting the time ranges scanned.
You can find these options in the TableInputFormat API documentation and set
them on your HBaseConfiguration before passing it to Spark.
Elasticsearch
Spark can both read and write data from Elasticsearch using Elasticsearch-Hadoop .
Elasticsearch is a new open source, Lucene-based search system.
The Elasticsearch connector is a bit different than the other connectors we have
examined, since it ignores the path information we provide and instead depends on
setting up configuration on our SparkContext. The Elasticsearch OutputFormat con‐
nector also doesn't quite have the types to use Spark's wrappers, so we instead use
saveAsHadoopDataSet , which means we need to set more properties by hand. Let's
look at how to read/write some simple data to Elasticsearch in Examples 5-46 and
5-47 .
The latest Elasticsearch Spark connector is even easier to use, sup‐
porting returning Spark SQL rows. This connector is still covered,
as the row conversion doesn't yet support all of the native types in
Elasticsearch.
Example 5-46. Elasticsearch output in Scala
val jobConf = new JobConf ( sc . hadoopConfiguration )
jobConf . set ( "mapred.output.format.class" , "org.elasticsearch.hadoop.mr.EsOutputFormat" )
jobConf . setOutputCommitter ( classOf [ FileOutputCommitter ])
jobConf . set ( ConfigurationOptions . ES_RESOURCE_WRITE , "twitter/tweets" )
jobConf . set ( ConfigurationOptions . ES_NODES , "localhost" )
FileOutputFormat . setOutputPath ( jobConf , new Path ( "-" ))
output . saveAsHadoopDataset ( jobConf )
Example 5-47. Elasticsearch input in Scala
def mapWritableToInput ( in : MapWritable ) : Map [ String , String ] = {
in . map { case ( k , v ) => ( k . toString , v . toString )}. toMap
}
val jobConf = new JobConf ( sc . hadoopConfiguration )
jobConf . set ( ConfigurationOptions . ES_RESOURCE_READ , args ( 1 ))
jobConf . set ( ConfigurationOptions . ES_NODES , args ( 2 ))
val currentTweets = sc . hadoopRDD ( jobConf ,
classOf [ EsInputFormat [ Object , MapWritable ]], classOf [ Object ],
classOf [ MapWritable ])
// Extract only the map
Search WWH ::




Custom Search