Databases Reference
In-Depth Information
}
@Override
public void prepare ( Map stormConf , TopologyContext context ) {
}
@Override
public void execute ( Tuple input , BasicOutputCollector collector ) {
String tweet = input . getStringByField ( "tweet" );
String tweetId = input . getStringByField ( "tweet_id" );
StringTokenizer strTok = new StringTokenizer ( tweet , " " );
TransactionAttempt tx = ( TransactionAttempt ) input . getValueByField ( "txid" );
HashSet < String > users = new HashSet < String >();
while ( strTok . hasMoreTokens ()) {
String user = strTok . nextToken ();
// Ensure this is an actual user, and that it's not repeated in the tweet
if ( user . startsWith ( "@" ) && ! users . contains ( user )) {
collector . emit ( "users" , new Values ( tx , tweetId , user ));
users . add ( user );
}
}
}
@Override
public void cleanup () {
}
}
As mentioned earlier in this chapter, UserSplitterBolt receives tuples, parses the text
of the tweet, and emits words preceded by @ , or the Twitter users. HashtagSplitter
Bolt works in a very similar way.
public class HashtagSplitterBolt implements IBasicBolt {
private static final long serialVersionUID = 1L ;
@Override
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
declarer . declareStream ( "hashtags" , new Fields ( "txid" , "tweet_id" , "hashtag" ));
}
@Override
public Map < String , Object > getComponentConfiguration () {
return null ;
}
@Override
public void prepare ( Map stormConf , TopologyContext context ) {
}
@Override
public void execute ( Tuple input , BasicOutputCollector collector ) {
 
Search WWH ::




Custom Search