Databases Reference
In-Depth Information
this . jedis = new Jedis ( "localhost" );
}
HashMap < String , Long > hashtags = new HashMap < String , Long >();
HashMap < String , Long > users = new HashMap < String , Long >();
HashMap < String , Long > usersHashtags = new HashMap < String , Long >();
private void count ( HashMap < String , Long > map , String key , int count ) {
Long value = map . get ( key );
if ( value == null )
value = ( long ) 0 ;
value += count ;
map . put ( key , value );
}
@Override
public void execute ( Tuple tuple ) {
String origin = tuple . getSourceComponent ();
if ( "users-splitter" . equals ( origin )) {
String user = tuple . getStringByField ( "user" );
count ( users , user , 1 );
} else if ( "hashtag-splitter" . equals ( origin )) {
String hashtag = tuple . getStringByField ( "hashtag" );
count ( hashtags , hashtag , 1 );
} else if ( "user-hashtag-merger" . equals ( origin )) {
String hashtag = tuple . getStringByField ( "hashtag" );
String user = tuple . getStringByField ( "user" );
String key = user + ":" + hashtag ;
Integer count = tuple . getIntegerByField ( "count" );
count ( usersHashtags , key , count );
}
}
@Override
public void finishBatch () {
String lastCommitedTransaction = jedis . get ( LAST_COMMITED_TRANSACTION_FIELD );
String currentTransaction = "" + id . getTransactionId ();
if ( currentTransaction . equals ( lastCommitedTransaction ))
return ;
Transaction multi = jedis . multi ();
multi . set ( LAST_COMMITED_TRANSACTION_FIELD , currentTransaction );
Set < String > keys = hashtags . keySet ();
for ( String hashtag : keys ) {
Long count = hashtags . get ( hashtag );
multi . hincrBy ( "hashtags" , hashtag , count );
}
keys = users . keySet ();
for ( String user : keys ) {
Long count = users . get ( user );
 
Search WWH ::




Custom Search