Databases Reference
In-Depth Information
• Sujit Pal wrote Scalding versions of the
“Impatient” series
.
• Costin Leau
integrated Scalding support into Spring-Hadoop
.
Books about Scala and Functional Programming
For more information about Scala, DSLs, and functional programming in general, check
out these topics:
Example 4 in Scalding: Replicated Joins
Next, let's modify the Scalding code to create an app similar to the Cascading version
in
“Example 4: Replicated Joins” on page 22
. We'll show how simple it is to extend pipe
assemblies in Scalding.
Starting from the “Impatient” source code directory that you cloned in Git, connect into
the
part8
subdirectory. Look at the code in
scripts/scala/Example4.scala
:
import
com.twitter.scalding._
class
Example4
(
args
:
Args
)
extends
Job
(
args
)
{
val
stopPipe
=
Tsv
(
args
(
"stop"
),
(
'stop
),
skipHeader
=
true
)
.
read
Tsv
(
args
(
"doc"
),
(
'doc_id
,
'text
),
skipHeader
=
true
)
.
read
.
flatMap
(
'text
->
'token
)
{
text
:
String
=>
text
.
split
(
"[ \\[\\]\\(\\),.]"
)
}
.
mapTo
(
'token
->
'token
)
{
token
:
String
=>
scrub
(
token
)
}
.
filter
(
'token
)
{
token
:
String
=>
token
.
length
>
0
}
.
leftJoinWithTiny
(
'token
->
'stop
,
stopPipe
)
.
filter
(
'stop
)
{
stop
:
String
=>
stop
==
null
}
.
groupBy
(
'token
)
{
_
.
size
(
'count
)
}
.
write
(
Tsv
(
args
(
"wc"
),
writeHeader
=
true
))
def
scrub
(
token
:
String
)
:
String
=
{
token
.
trim
.
toLowerCase
}