Databases Reference
In-Depth Information
String tweet = input . getStringByField ( "tweet" );
String tweetId = input . getStringByField ( "tweet_id" );
StringTokenizer strTok = new StringTokenizer ( tweet , " " );
TransactionAttempt tx = ( TransactionAttempt ) input . getValueByField ( "txid" );
HashSet < String > words = new HashSet < String >();
while ( strTok . hasMoreTokens ()) {
String word = strTok . nextToken ();
if ( word . startsWith ( "#" ) && ! words . contains ( word )) {
collector . emit ( "hashtags" , new Values ( tx , tweetId , word ));
words . add ( word );
}
}
}
@Override
public void cleanup () {
}
}
Now let's see what happens in UserHashtagJoinBolt . The first important thing to notice
is that it is a BaseBatchBolt . This means that the execute method will operate on the
received tuples but won't be emitting any new tuple. Eventually, when the batch is
finished, Storm will call the finishBatch method.
public void execute ( Tuple tuple ) {
String source = tuple . getSourceStreamId ();
String tweetId = tuple . getStringByField ( "tweet_id" );
if ( "hashtags" . equals ( source )) {
String hashtag = tuple . getStringByField ( "hashtag" );
add ( tweetHashtags , tweetId , hashtag );
} else if ( "users" . equals ( source )) {
String user = tuple . getStringByField ( "user" );
add ( userTweets , user , tweetId );
}
}
Since you need to associate all the hashtags of a tweet with the users mentioned in that
tweet and count how many times they appeared, you need to join the two streams of
the previous bolts. Do that for the entire batch, and once it finishes, the finishBatch
method is called.
@Override
public void finishBatch () {
for ( String user : userTweets . keySet ()) {
Set < String > tweets = getUserTweets ( user );
HashMap < String , Integer > hashtagsCounter = new HashMap < String , Integer >();
for ( String tweet : tweets ) {
Set < String > hashtags = getTweetHashtags ( tweet );
if ( hashtags != null ) {
for ( String hashtag : hashtags ) {
Integer count = hashtagsCounter . get ( hashtag );
 
Search WWH ::




Custom Search