Databases Reference
In-Depth Information
Note that as long as the bolt keeps information in memory by user, it's very important
that when you parallelize it you use fieldsGrouping by user in the first degree, otherwise
different copies of the user history will get out of synch.
ProductCategoriesCounterBolt
The ProductCategoriesCounterBolt class is in charge of keeping track of all the product-
category relationships. It receives the product-category pairs emitted by the UsersHis-
toryBolt and updates the counters.
The information about the number of occurrences of each pair is stored on the Redis
server. A local cache for reads and a write buffer are used for performance reasons. The
information is sent to Redis in a background thread.
This bolt also emits a tuple with the updated counter for the input pair to feed the next
bolt in the topology, the NewsNotifierBolt, which is in charge of broadcasting the news
to the final users for real-time updates.
public
class
ProductCategoriesCounterBolt
extends
BaseRichBolt
{
...
@Override
public
void
execute
(
Tuple
input
)
{
String
product
=
input
.
getString
(
0
);
String
categ
=
input
.
getString
(
1
);
int
total
=
count
(
product
,
categ
);
collector
.
emit
(
new
Values
(
product
,
categ
,
total
));
}
...
private
int
count
(
String
product
,
String
categ
)
{
int
count
=
getProductCategoryCount
(
categ
,
product
);
count
++;
storeProductCategoryCount
(
categ
,
product
,
count
);
return
count
;
}
...
}
Persistence in this bolt is hidden in the
getProductCategoryCount
and
storeProductCa
tegoryCount
methods. Let's take a look inside them:
package
storm
.
analytics
;
...
public
class
ProductCategoriesCounterBolt
extends
BaseRichBolt
{
// ITEM:CATEGORY -> COUNT
HashMap
<
String
,
Integer
>
counter
=
new
HashMap
<
String
,
Integer
>();
// ITEM:CATEGORY -> COUNT
HashMap
<
String
,
Integer
>
pendingToSave
=
new
HashMap
<
String
,
Integer
>();
...
public
int
getProductCategoryCount
(
String
categ
,
String
product
)
{
Integer
count
=
counter
.
get
(
buildLocalKey
(
categ
,
product
));
if
(
count
==
null
)
{