Databases Reference
In-Depth Information
• The user that was navigating.
• The type of page that the user browsed.
• Additional page information that depends on the type. The “PRODUCT” page
type has an entry for the product ID being browsed.
The spout emits a tuple containing this information by calling
collector.emit(new Val
ues(user, entry))
. The content of this tuple is the input to the next bolt in the topology:
The GetCategoryBolt.
GetCategoryBolt
This is a very simple bolt. Its sole responsibility is to deserialize the content of the tuple
emitted by the previous spout. If the entry is about a product page, then it loads the
product information from the Redis server by using the ProductsReader helper class.
Then, for each tuple in the input, it emits a new tuple with further product specific
information:
• The user
• The product
• The category of the product
package
storm
.
analytics
;
public
class
GetCategoryBolt
extends
BaseBasicBolt
{
private
ProductsReader
reader
;
...
@Override
public
void
execute
(
Tuple
input
,
BasicOutputCollector
collector
)
{
NavigationEntry
entry
=
(
NavigationEntry
)
input
.
getValue
(
1
);
if
(
"PRODUCT"
.
equals
(
entry
.
getPageType
())){
try
{
String
product
=
(
String
)
entry
.
getOtherData
().
get
(
"product"
);
// Call the items API to get item information
Product
itm
=
reader
.
readItem
(
product
);
if
(
itm
==
null
)
return
;
String
categ
=
itm
.
getCategory
();
collector
.
emit
(
new
Values
(
entry
.
getUserId
(),
product
,
categ
));
}
catch
(
Exception
ex
)
{
System
.
err
.
println
(
"Error processing PRODUCT tuple"
+
ex
);
ex
.
printStackTrace
();
}
}
}