Database Reference
In-Depth Information
This example assumes that you've already created a SparkContext named sc . First,
we create a Spark RDD of AlignmentRecord s using a pushdown predicate to remove
low-quality reads and a projection to only materialize the sequence field in each read:
// Load reads from 'inputPath' into an RDD for analysis
val adamRecords : RDD [ AlignmentRecord ] = sc . adamLoad ( args . inputPath ,
// Filter out all low-quality reads that failed vendor quality
checks
predicate = Some ( classOf [ HighQualityReadsPredicate ]),
// Only materialize the 'sequence' from each record
projection = Some ( Projection ( AlignmentRecordField . sequence )))
Since Parquet is a column-oriented storage format, it can rapidly materialize only the se-
quence column and quickly skip over the unwanted fields. Next, we walk over each
sequence using a sliding window of length k =21, emit a count of 1L , and then re-
duceByKey using the k -mer subsequence as the key to get the total counts for the input
file:
// The length of k-mers we want to count
val kmerLength = 21
// Process the reads into an RDD of tuples with k-mers and counts
val kmers : RDD [( String , Long )] = adamRecords . flatMap ( read => {
read . getSequence
. toString
. sliding ( kmerLength )
. map ( k => ( k , 1L ))
}). reduceByKey { case ( a , b ) => a + b }
// Print the k-mers as a text file to the 'outputPath'
kmers . map { case ( kmer , count ) => s" $count , $kmer " }
. saveAsTextFile ( args . outputPath )
When run on sample NA21144 , chromosome 11 in the 1000 Genomes project, [ 166 ] this
job outputs the following:
AAAAAAAAAAAAAAAAAAAAAA, 124069
TTTTTTTTTTTTTTTTTTTTTT, 120590
ACACACACACACACACACACAC, 41528
GTGTGTGTGTGTGTGTGTGTGT, 40905
CACACACACACACACACACACA, 40795
TGTGTGTGTGTGTGTGTGTGTG, 40329
TAATCCCAGCACTTTGGGAGGC, 32122
Search WWH ::




Custom Search