Databases Reference
In-Depth Information
Figure 3-4. Document frequency branch
Notice that the results are named
lhs_join
, again preparing for the subsequent join.
Now we have all the components needed to calculate TF-IDF weights.
To finish the calculations in parallel, we'll use two different kinds of joins in Cascading
branches together:
// join to bring together all the components for calculating TF-IDF
// the D side of the join is smaller, so it goes on the RHS
Pipe
idfPipe
=
new
HashJoin
(
dfPipe
,
lhs_join
,
dPipe
,
rhs_join
);
// the IDF side of the join is smaller, so it goes on the RHS
Pipe
tfidfPipe
=
new
CoGroup
(
tfPipe
,
tf_token
,
idfPipe
,
df_token
);
We used
HashJoin
previously for a replicated join. In this case we know that document
count will not be a large amount of data, so it works for the RHS. The other join,
CoGroup
, handles a more general case where the RHS cannot be kept entirely in memory.
In those cases a threshold can be adjusted for “spill,” where RHS tuples get moved to
disk.
Then we calculate TF-IDF weights using an
ExpressionFunction
in Cascading:
// calculate the TF-IDF weights, per token, per document
Fields
tfidf
=
new
Fields
(
"tfidf"
);
String
expression
=
"(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )"
;
ExpressionFunction
tfidfExpression
=
new
ExpressionFunction
(
tfidf
,
expression
,
Double
.
class
);
Fields
tfidfArguments
=
new
Fields
(
"tf_count"
,
"df_count"
,
"n_docs"
);
tfidfPipe
=
new
Each
(
tfidfPipe
,
tfidfArguments
,
tfidfExpression
,
Fields
.
ALL
);
fieldSelector
=
new
Fields
(
"tf_token"
,
"doc_id"
,
"tfidf"
);
tfidfPipe
=
new
Retain
(
tfidfPipe
,
fieldSelector
);
tfidfPipe
=
new
Rename
(
tfidfPipe
,
tf_token
,
token
);