Database Reference
In-Depth Information
This example assumes that you've already created a
SparkContext
named
sc
. First,
we create a Spark RDD of
AlignmentRecord
s using a pushdown predicate to remove
low-quality reads and a projection to only materialize the
sequence
field in each read:
// Load reads from 'inputPath' into an RDD for analysis
val
adamRecords
:
RDD
[
AlignmentRecord
]
=
sc
.
adamLoad
(
args
.
inputPath
,
// Filter out all low-quality reads that failed vendor quality
checks
predicate
=
Some
(
classOf
[
HighQualityReadsPredicate
]),
// Only materialize the 'sequence' from each record
projection
=
Some
(
Projection
(
AlignmentRecordField
.
sequence
)))
Since Parquet is a column-oriented storage format, it can rapidly materialize only the
se-
quence
column and quickly skip over the unwanted fields. Next, we walk over each
sequence
using a sliding window of length
k
=21, emit a count of
1L
, and then
re-
duceByKey
using the
k
-mer subsequence as the key to get the total counts for the input
file:
// The length of k-mers we want to count
val
kmerLength
=
21
// Process the reads into an RDD of tuples with k-mers and counts
val
kmers
:
RDD
[(
String
,
Long
)]
=
adamRecords
.
flatMap
(
read
=>
{
read
.
getSequence
.
toString
.
sliding
(
kmerLength
)
.
map
(
k
=>
(
k
,
1L
))
}).
reduceByKey
{
case
(
a
,
b
)
=>
a
+
b
}
// Print the k-mers as a text file to the 'outputPath'
kmers
.
map
{
case
(
kmer
,
count
)
=>
s"
$count
,
$kmer
"
}
.
saveAsTextFile
(
args
.
outputPath
)
job outputs the following:
AAAAAAAAAAAAAAAAAAAAAA, 124069
TTTTTTTTTTTTTTTTTTTTTT, 120590
ACACACACACACACACACACAC, 41528
GTGTGTGTGTGTGTGTGTGTGT, 40905
CACACACACACACACACACACA, 40795
TGTGTGTGTGTGTGTGTGTGTG, 40329
TAATCCCAGCACTTTGGGAGGC, 32122