Databases Reference
In-Depth Information
• The user that was navigating.
• The type of page that the user browsed.
• Additional page information that depends on the type. The “PRODUCT” page
type has an entry for the product ID being browsed.
The spout emits a tuple containing this information by calling collector.emit(new Val
ues(user, entry)) . The content of this tuple is the input to the next bolt in the topology:
The GetCategoryBolt.
GetCategoryBolt
This is a very simple bolt. Its sole responsibility is to deserialize the content of the tuple
emitted by the previous spout. If the entry is about a product page, then it loads the
product information from the Redis server by using the ProductsReader helper class.
Then, for each tuple in the input, it emits a new tuple with further product specific
information:
• The user
• The product
• The category of the product
package storm . analytics ;
public class GetCategoryBolt extends BaseBasicBolt {
private ProductsReader reader ;
...
@Override
public void execute ( Tuple input , BasicOutputCollector collector ) {
NavigationEntry entry = ( NavigationEntry ) input . getValue ( 1 );
if ( "PRODUCT" . equals ( entry . getPageType ())){
try {
String product = ( String ) entry . getOtherData (). get ( "product" );
// Call the items API to get item information
Product itm = reader . readItem ( product );
if ( itm == null )
return ;
String categ = itm . getCategory ();
collector . emit ( new Values ( entry . getUserId (), product , categ ));
} catch ( Exception ex ) {
System . err . println ( "Error processing PRODUCT tuple" + ex );
ex . printStackTrace ();
}
}
}
 
Search WWH ::




Custom Search