Databases Reference
In-Depth Information
UsersNavigationSpout
The UsersNavigationSpout is in charge of feeding the topology with navigation entries.
Each navigation entry is a reference to a product page viewed by one user. They are
stored in the Redis Server by the web application. We'll go into more detail on that in
a moment.
To read entries from the Redis server, you'll be using https://github.com/xetorthio/je
dis , a blazingly small and simple Redis client for Java.
Only the relevant part of the code is shown in the following code block.
package storm . analytics ;
public class UsersNavigationSpout extends BaseRichSpout {
Jedis jedis ;
...
@Override
public void nextTuple () {
String content = jedis . rpop ( "navigation" );
if ( content == null || "nil" . equals ( content )) {
try { Thread . sleep ( 300 ); } catch ( InterruptedException e ) {}
} else {
JSONObject obj =( JSONObject ) JSONValue . parse ( content );
String user = obj . get ( "user" ). toString ();
String product = obj . get ( "product" ). toString ();
String type = obj . get ( "type" ). toString ();
HashMap < String , String > map = new HashMap < String , String >();
map . put ( "product" , product );
NavigationEntry entry = new NavigationEntry ( user , type , map );
collector . emit ( new Values ( user , entry ));
}
}
@Override
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declare ( new Fields ( "user" , "otherdata" ));
}
}
First the spout calls jedis.rpop("navigation") to remove and return the right-most
element in the “navigation” list on the Redis server. If the list is already empty, sleep
for 0.3 seconds so as not to block the server with a busy wait loop. If an entry is found,
parse the content (the content is JSON) and map it to a NavigationEntry object, which
is just a POJO containing the entry information:
 
Search WWH ::




Custom Search