Database Reference
In-Depth Information
Example 5-17. Loading CSV in full in Java
public
static
class
ParseLine
implements
FlatMapFunction
<
Tuple2
<
String
,
String
>,
String
[]>
{
public
Iterable
<
String
[]>
call
(
Tuple2
<
String
,
String
>
file
)
throws
Exception
{
CSVReader
reader
=
new
CSVReader
(
new
StringReader
(
file
.
_2
()));
return
reader
.
readAll
();
}
}
JavaPairRDD
<
String
,
String
>
csvData
=
sc
.
wholeTextFiles
(
inputFile
);
JavaRDD
<
String
[]>
keyedRDD
=
csvData
.
flatMap
(
new
ParseLine
());
If there are only a few input files, and you need to use the
whole
File()
method, you may want to repartition your input to allow
Spark to effectively parallelize your future operations.
Saving CSV
As with JSON data, writing out CSV/TSV data is quite simple and we can benefit
from reusing the output encoding object. Since in CSV we don't output the field
name with each record, to have a consistent output we need to create a mapping. One
of the easy ways to do this is to just write a function that converts the fields to given
positions in an array. In Python, if we are outputting dictionaries the CSV writer can
do this for us based on the order in which we provide the
fieldnames
when con‐
structing the writer.
The CSV libraries we are using output to files/writers so we can use
StringWriter
/
StringIO
to allow us to put the result in our RDD, as you can see in Examples
5-18
and
5-19
.
Example 5-18. Writing CSV in Python
def
writeRecords
(
records
):
"""Write out CSV lines"""
output
=
StringIO
.
StringIO
()
writer
=
csv
.
DictWriter
(
output
,
fieldnames
=
[
"name"
,
"favoriteAnimal"
])
for
record
in
records
:
writer
.
writerow
(
record
)
return
[
output
.
getvalue
()]
pandaLovers
.
mapPartitions
(
writeRecords
)
.
saveAsTextFile
(
outputFile
)
Example 5-19. Writing CSV in Scala
pandaLovers
.
map
(
person
=>
List
(
person
.
name
,
person
.
favoriteAnimal
).
toArray
)
.
mapPartitions
{
people
=>
val
stringWriter
=
new
StringWriter
();