Database Reference
In-Depth Information
Example 16-3. A LoadFunc UDF to load tuple fields as column ranges
public class
CutLoadFunc
extends
LoadFunc
{
private static final
Log LOG
=
LogFactory
.
getLog
(
CutLoadFunc
.
class
);
private final
List
<
Range
>
ranges
;
private final
TupleFactory tupleFactory
=
TupleFactory
.
getInstance
();
private
RecordReader reader
;
public
CutLoadFunc
(
String cutPattern
) {
ranges
=
Range
.
parse
(
cutPattern
);
}
@Override
public
void
setLocation
(
String location
,
Job job
)
throws
IOException
{
FileInputFormat
.
setInputPaths
(
job
,
location
);
}
@Override
public
InputFormat
getInputFormat
() {
return new
TextInputFormat
();
}
@Override
public
void
prepareToRead
(
RecordReader reader
,
PigSplit split
) {
this
.
reader
=
reader
;
}
@Override
public
Tuple
getNext
()
throws
IOException
{
try
{
if
(!
reader
.
nextKeyValue
()) {
return null
;
}
Text value
= (
Text
)
reader
.
getCurrentValue
();
String line
=
value
.
toString
();
Tuple tuple
=
tupleFactory
.
newTuple
(
ranges
.
size
());
for
(
int
i
=
0
;
i
<
ranges
.
size
();
i
++) {
Range range
=
ranges
.
get
(
i
);
if
(
range
.
getEnd
() >
line
.
length
()) {
LOG
.
warn
(
String
.
format
(
"Range end (%s) is longer than line length (%s)"
,
range
.
getEnd
(),
line
.
length
()));
continue
;
}