Databases Reference
In-Depth Information
Note that as long as the bolt keeps information in memory by user, it's very important
that when you parallelize it you use fieldsGrouping by user in the first degree, otherwise
different copies of the user history will get out of synch.
ProductCategoriesCounterBolt
The ProductCategoriesCounterBolt class is in charge of keeping track of all the product-
category relationships. It receives the product-category pairs emitted by the UsersHis-
toryBolt and updates the counters.
The information about the number of occurrences of each pair is stored on the Redis
server. A local cache for reads and a write buffer are used for performance reasons. The
information is sent to Redis in a background thread.
This bolt also emits a tuple with the updated counter for the input pair to feed the next
bolt in the topology, the NewsNotifierBolt, which is in charge of broadcasting the news
to the final users for real-time updates.
public class ProductCategoriesCounterBolt extends BaseRichBolt {
...
@Override
public void execute ( Tuple input ) {
String product = input . getString ( 0 );
String categ = input . getString ( 1 );
int total = count ( product , categ );
collector . emit ( new Values ( product , categ , total ));
}
...
private int count ( String product , String categ ) {
int count = getProductCategoryCount ( categ , product );
count ++;
storeProductCategoryCount ( categ , product , count );
return count ;
}
...
}
Persistence in this bolt is hidden in the getProductCategoryCount and storeProductCa
tegoryCount methods. Let's take a look inside them:
package storm . analytics ;
...
public class ProductCategoriesCounterBolt extends BaseRichBolt {
// ITEM:CATEGORY -> COUNT
HashMap < String , Integer > counter = new HashMap < String , Integer >();
// ITEM:CATEGORY -> COUNT
HashMap < String , Integer > pendingToSave = new HashMap < String , Integer >();
...
public int getProductCategoryCount ( String categ , String product ) {
Integer count = counter . get ( buildLocalKey ( categ , product ));
if ( count == null ) {
 
Search WWH ::




Custom Search