Database Reference
In-Depth Information
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
if
(
args
.
length
!=
1
) {
System
.
err
.
println
(
"Usage: HBaseTemperatureImporter <input>"
);
return
-
1
;
}
Job job
=
new
Job
(
getConf
(),
getClass
().
getSimpleName
());
job
.
setJarByClass
(
getClass
());
FileInputFormat
.
addInputPath
(
job
,
new
Path
(
args
[
0
]));
job
.
getConfiguration
().
set
(
TableOutputFormat
.
OUTPUT_TABLE
,
"observations"
);
job
.
setMapperClass
(
HBaseTemperatureMapper
.
class
);
job
.
setNumReduceTasks
(
0
);
job
.
setOutputFormatClass
(
TableOutputFormat
.
class
);
return
job
.
waitForCompletion
(
true
) ?
0
:
1
;
}
public static
void
main
(
String
[]
args
)
throws
Exception
{
int
exitCode
=
ToolRunner
.
run
(
HBaseConfiguration
.
create
(),
new
HBaseTemperatureImporter
(),
args
);
System
.
exit
(
exitCode
);
}
}
HBaseTemperatureImporter
has a nested class named
HBaseTemperat-
er class implements
Tool
and does the setup to launch the map-only job.
HBaseTem-
peratureMapper
takes the same input as
MaxTemperatureMapper
and does the
valid temperatures. But rather than writing valid temperatures to the output context, as
MaxTemperatureMapper
does, it creates a
Put
object to add those temperatures to
the
observations
HBase table, in the
data:airtemp
column. (We are using static
constants for
data
and
airtemp
, imported from the
HBaseTemperatureQuery
class described later.)
The row key for each observation is created in the
makeObservationRowKey()
method on
RowKeyConverter
from the station ID and observation time:
public class
RowKeyConverter
{
private static final
int
STATION_ID_LENGTH
=
12
;
/**
* @return A row key whose format is: <station_id>