Databases Reference
In-Depth Information
ferent bolts, UserSplitterBolt and HashtagSplitterBolt , will receive tuples from the
spout. UserSplitterBolt will parse the tweet and look for users—words preceded by
@ —and will emit these words in a custom stream called users . The HashatagSplitter
Bolt will also parse the tweet, looking for words preceded by # , and will emit these
words in a custom stream called hashtags . A third bolt, the UserHashtagJoinBolt , will
receive both streams and count how many times a hashtag has appeared in a tweet
where a user was named. In order to count and emit the result, this bolt will be a
BaseBatchBolt (more on that later).
Finally, a last bolt, called RedisCommitterBolt , will receive the three streams—the ones
generated by UserSplitterBolt , HashtagSplitterBolt , and UserHashtagJoinBolt . It will
count everything and once finished processing the batch of tuples, it will send every-
thing to Redis, in one transaction. This bolt is a special kind of bolt known as a com-
mitter bolt , explained later in this chapter.
In order to build this topology, use TransactionalTopologyBuilder , like the following
code block:
TransactionalTopologyBuilder builder =
new TransactionalTopologyBuilder ( "test" , "spout" , new TweetsTransactionalSpout ());
builder . setBolt ( "users-splitter" , new UserSplitterBolt (), 4 ). shuffleGrouping ( "spout" );
builder . setBolt ( "hashtag-splitter" ,
new HashtagSplitterBolt (), 4 ). shuffleGrouping ( "spout" );
builder . setBolt ( "user-hashtag-merger" , new UserHashtagJoinBolt (), 4 )
. fieldsGrouping ( "users-splitter" , "users" , new Fields ( "tweet_id" ))
. fieldsGrouping ( "hashtag-splitter" , "hashtags" , new Fields ( "tweet_id" ));
builder . setBolt ( "redis-committer" , new RedisCommiterCommiterBolt ())
. globalGrouping ( "users-splitter" , "users" )
. globalGrouping ( "hashtag-splitter" , "hashtags" )
. globalGrouping ( "user-hashtag-merger" );
Let's see how you can implement the spout in a transactional topology.
The Spout
The spout in a transactional topology is completely different from a standard spout.
public class TweetsTransactionalSpout extends
BaseTransactionalSpout < TransactionMetadata > {
As you can see in the class definition, TweetsTransactionalSpout extends BaseTransac
tionalSpout with a generic type. The type you set there is something known as the
transaction metadata . It will be used later to emit batches of tuples from the source.
In this example, TransactionMetadata is defined as:
public class TransactionMetadata implements Serializable {
private static final long serialVersionUID = 1L ;
 
Search WWH ::




Custom Search