Database Reference
In-Depth Information
Next, we apply a filter to remove any bad records:
scala> val filtered = records.filter(rec => (rec(1) != "9999"
&& rec(2).matches("[01459]")))
filtered: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[3]
at filter at
<console>:16
The filter() method on RDD takes a predicate, a function that returns a Boolean .
This one tests for records that don't have a missing temperature (indicated by 9999) or a
bad quality reading.
To find the maximum temperatures for each year, we need to perform a grouping opera-
tion on the year field so we can process all the temperature values for each year. Spark
provides a reduceByKey() method to do this, but it needs an RDD of key-value pairs,
represented by a Scala Tuple2 . So, first we need to transform our RDD into the correct
form using another map:
scala> val tuples = filtered.map(rec => (rec(0).toInt, rec(1).toInt))
tuples: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[4] at map at
<console>:18
Then we can perform the aggregation. The reduceByKey() method's argument is a
function that takes a pair of values and combines them into a single value; in this case, we
use Java's Math.max function:
scala> val maxTemps = tuples.reduceByKey((a, b) => Math.max(a, b))
maxTemps: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7]
at
reduceByKey at <console>:21
We can display the contents of maxTemps by invoking the foreach() method and
passing println() to print each element to the console:
scala> maxTemps.foreach(println(_))
(1950,22)
(1949,111)
The foreach() method is the same as the equivalent on standard Scala collections, like
List , and applies a function (that has some side effect) to each element in the RDD. It is
this operation that causes Spark to run a job to compute the values in the RDD, so they
can be run through the println() method.
Alternatively, we can save the RDD to the filesystem with:
scala> maxTemps.saveAsTextFile("output")
Search WWH ::




Custom Search