Databases Reference
In-Depth Information
ferent bolts,
UserSplitterBolt
and
HashtagSplitterBolt
, will receive tuples from the
spout.
UserSplitterBolt
will parse the tweet and look for users—words preceded by
@
—and will emit these words in a custom stream called
users
. The
HashatagSplitter
Bolt
will also parse the tweet, looking for words preceded by
#
, and will emit these
words in a custom stream called
hashtags
. A third bolt, the
UserHashtagJoinBolt
, will
receive both streams and count how many times a hashtag has appeared in a tweet
where a user was named. In order to count and emit the result, this bolt will be a
BaseBatchBolt
(more on that later).
Finally, a last bolt, called
RedisCommitterBolt
, will receive the three streams—the ones
generated by
UserSplitterBolt
,
HashtagSplitterBolt
, and
UserHashtagJoinBolt
. It will
count everything and once finished processing the batch of tuples, it will send every-
thing to Redis, in one transaction. This bolt is a special kind of bolt known as a
com-
mitter bolt
, explained later in this chapter.
In order to build this topology, use
TransactionalTopologyBuilder
, like the following
code block:
TransactionalTopologyBuilder
builder
=
new
TransactionalTopologyBuilder
(
"test"
,
"spout"
,
new
TweetsTransactionalSpout
());
builder
.
setBolt
(
"users-splitter"
,
new
UserSplitterBolt
(),
4
).
shuffleGrouping
(
"spout"
);
builder
.
setBolt
(
"hashtag-splitter"
,
new
HashtagSplitterBolt
(),
4
).
shuffleGrouping
(
"spout"
);
builder
.
setBolt
(
"user-hashtag-merger"
,
new
UserHashtagJoinBolt
(),
4
)
.
fieldsGrouping
(
"users-splitter"
,
"users"
,
new
Fields
(
"tweet_id"
))
.
fieldsGrouping
(
"hashtag-splitter"
,
"hashtags"
,
new
Fields
(
"tweet_id"
));
builder
.
setBolt
(
"redis-committer"
,
new
RedisCommiterCommiterBolt
())
.
globalGrouping
(
"users-splitter"
,
"users"
)
.
globalGrouping
(
"hashtag-splitter"
,
"hashtags"
)
.
globalGrouping
(
"user-hashtag-merger"
);
Let's see how you can implement the spout in a transactional topology.
The Spout
The spout in a transactional topology is completely different from a standard spout.
public
class
TweetsTransactionalSpout
extends
BaseTransactionalSpout
<
TransactionMetadata
>
{
As you can see in the class definition,
TweetsTransactionalSpout
extends
BaseTransac
tionalSpout
with a generic type. The type you set there is something known as the
transaction metadata
. It will be used later to emit batches of tuples from the source.
In this example,
TransactionMetadata
is defined as:
public
class
TransactionMetadata
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;