Databases Reference
In-Depth Information
UsersNavigationSpout
The
UsersNavigationSpout
is in charge of feeding the topology with navigation entries.
Each navigation entry is a reference to a product page viewed by one user. They are
stored in the Redis Server by the web application. We'll go into more detail on that in
a moment.
To read entries from the Redis server, you'll be using
https://github.com/xetorthio/je
dis
, a blazingly small and simple Redis client for Java.
Only the relevant part of the code is shown in the following code block.
package
storm
.
analytics
;
public
class
UsersNavigationSpout
extends
BaseRichSpout
{
Jedis
jedis
;
...
@Override
public
void
nextTuple
()
{
String
content
=
jedis
.
rpop
(
"navigation"
);
if
(
content
==
null
||
"nil"
.
equals
(
content
))
{
try
{
Thread
.
sleep
(
300
);
}
catch
(
InterruptedException
e
)
{}
}
else
{
JSONObject
obj
=(
JSONObject
)
JSONValue
.
parse
(
content
);
String
user
=
obj
.
get
(
"user"
).
toString
();
String
product
=
obj
.
get
(
"product"
).
toString
();
String
type
=
obj
.
get
(
"type"
).
toString
();
HashMap
<
String
,
String
>
map
=
new
HashMap
<
String
,
String
>();
map
.
put
(
"product"
,
product
);
NavigationEntry
entry
=
new
NavigationEntry
(
user
,
type
,
map
);
collector
.
emit
(
new
Values
(
user
,
entry
));
}
}
@Override
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"user"
,
"otherdata"
));
}
}
First the spout calls
jedis.rpop("navigation")
to remove and return the right-most
element in the “navigation” list on the Redis server. If the list is already empty, sleep
for 0.3 seconds so as not to block the server with a busy wait loop. If an entry is found,
parse the content (the content is JSON) and map it to a
NavigationEntry
object, which
is just a POJO containing the entry information: