Database Reference
In-Depth Information
Example 5-17. Loading CSV in full in Java
public static class ParseLine
implements FlatMapFunction < Tuple2 < String , String >, String []> {
public Iterable < String []> call ( Tuple2 < String , String > file ) throws Exception {
CSVReader reader = new CSVReader ( new StringReader ( file . _2 ()));
return reader . readAll ();
}
}
JavaPairRDD < String , String > csvData = sc . wholeTextFiles ( inputFile );
JavaRDD < String []> keyedRDD = csvData . flatMap ( new ParseLine ());
If there are only a few input files, and you need to use the whole
File() method, you may want to repartition your input to allow
Spark to effectively parallelize your future operations.
Saving CSV
As with JSON data, writing out CSV/TSV data is quite simple and we can benefit
from reusing the output encoding object. Since in CSV we don't output the field
name with each record, to have a consistent output we need to create a mapping. One
of the easy ways to do this is to just write a function that converts the fields to given
positions in an array. In Python, if we are outputting dictionaries the CSV writer can
do this for us based on the order in which we provide the fieldnames when con‐
structing the writer.
The CSV libraries we are using output to files/writers so we can use StringWriter /
StringIO to allow us to put the result in our RDD, as you can see in Examples 5-18
and 5-19 .
Example 5-18. Writing CSV in Python
def writeRecords ( records ):
"""Write out CSV lines"""
output = StringIO . StringIO ()
writer = csv . DictWriter ( output , fieldnames = [ "name" , "favoriteAnimal" ])
for record in records :
writer . writerow ( record )
return [ output . getvalue ()]
pandaLovers . mapPartitions ( writeRecords ) . saveAsTextFile ( outputFile )
Example 5-19. Writing CSV in Scala
pandaLovers . map ( person => List ( person . name , person . favoriteAnimal ). toArray )
. mapPartitions { people =>
val stringWriter = new StringWriter ();
Search WWH ::




Custom Search