Database Reference
In-Depth Information
To optimize reading from HBase,
TableInputFormat
includes multiple settings such
as limiting the scan to just one set of columns and limiting the time ranges scanned.
them on your
HBaseConfiguration
before passing it to Spark.
Elasticsearch
Spark can both read and write data from Elasticsearch using
Elasticsearch-Hadoop
.
Elasticsearch is a new open source, Lucene-based search system.
The Elasticsearch connector is a bit different than the other connectors we have
examined, since it ignores the path information we provide and instead depends on
setting up configuration on our SparkContext. The Elasticsearch
OutputFormat
con‐
nector also doesn't quite have the types to use Spark's wrappers, so we instead use
saveAsHadoopDataSet
, which means we need to set more properties by hand. Let's
look at how to read/write some simple data to Elasticsearch in Examples
5-46
and
5-47
.
The latest Elasticsearch Spark connector is even easier to use, sup‐
porting returning Spark SQL rows. This connector is still covered,
as the row conversion doesn't yet support all of the native types in
Elasticsearch.
Example 5-46. Elasticsearch output in Scala
val
jobConf
=
new
JobConf
(
sc
.
hadoopConfiguration
)
jobConf
.
set
(
"mapred.output.format.class"
,
"org.elasticsearch.hadoop.mr.EsOutputFormat"
)
jobConf
.
setOutputCommitter
(
classOf
[
FileOutputCommitter
])
jobConf
.
set
(
ConfigurationOptions
.
ES_RESOURCE_WRITE
,
"twitter/tweets"
)
jobConf
.
set
(
ConfigurationOptions
.
ES_NODES
,
"localhost"
)
FileOutputFormat
.
setOutputPath
(
jobConf
,
new
Path
(
"-"
))
output
.
saveAsHadoopDataset
(
jobConf
)
Example 5-47. Elasticsearch input in Scala
def
mapWritableToInput
(
in
:
MapWritable
)
:
Map
[
String
,
String
]
=
{
in
.
map
{
case
(
k
,
v
)
=>
(
k
.
toString
,
v
.
toString
)}.
toMap
}
val
jobConf
=
new
JobConf
(
sc
.
hadoopConfiguration
)
jobConf
.
set
(
ConfigurationOptions
.
ES_RESOURCE_READ
,
args
(
1
))
jobConf
.
set
(
ConfigurationOptions
.
ES_NODES
,
args
(
2
))
val
currentTweets
=
sc
.
hadoopRDD
(
jobConf
,
classOf
[
EsInputFormat
[
Object
,
MapWritable
]],
classOf
[
Object
],
classOf
[
MapWritable
])
// Extract only the map