Databases Reference
In-Depth Information
String
tweet
=
input
.
getStringByField
(
"tweet"
);
String
tweetId
=
input
.
getStringByField
(
"tweet_id"
);
StringTokenizer
strTok
=
new
StringTokenizer
(
tweet
,
" "
);
TransactionAttempt
tx
=
(
TransactionAttempt
)
input
.
getValueByField
(
"txid"
);
HashSet
<
String
>
words
=
new
HashSet
<
String
>();
while
(
strTok
.
hasMoreTokens
())
{
String
word
=
strTok
.
nextToken
();
if
(
word
.
startsWith
(
"#"
)
&&
!
words
.
contains
(
word
))
{
collector
.
emit
(
"hashtags"
,
new
Values
(
tx
,
tweetId
,
word
));
words
.
add
(
word
);
}
}
}
@Override
public
void
cleanup
()
{
}
}
Now let's see what happens in
UserHashtagJoinBolt
. The first important thing to notice
is that it is a
BaseBatchBolt
. This means that the
execute
method will operate on the
received tuples but won't be emitting any new tuple. Eventually, when the batch is
finished, Storm will call the
finishBatch
method.
public
void
execute
(
Tuple
tuple
)
{
String
source
=
tuple
.
getSourceStreamId
();
String
tweetId
=
tuple
.
getStringByField
(
"tweet_id"
);
if
(
"hashtags"
.
equals
(
source
))
{
String
hashtag
=
tuple
.
getStringByField
(
"hashtag"
);
add
(
tweetHashtags
,
tweetId
,
hashtag
);
}
else
if
(
"users"
.
equals
(
source
))
{
String
user
=
tuple
.
getStringByField
(
"user"
);
add
(
userTweets
,
user
,
tweetId
);
}
}
Since you need to associate all the hashtags of a tweet with the users mentioned in that
tweet and count how many times they appeared, you need to join the two streams of
the previous bolts. Do that for the entire batch, and once it finishes, the
finishBatch
method is called.
@Override
public
void
finishBatch
()
{
for
(
String
user
:
userTweets
.
keySet
())
{
Set
<
String
>
tweets
=
getUserTweets
(
user
);
HashMap
<
String
,
Integer
>
hashtagsCounter
=
new
HashMap
<
String
,
Integer
>();
for
(
String
tweet
:
tweets
)
{
Set
<
String
>
hashtags
=
getTweetHashtags
(
tweet
);
if
(
hashtags
!=
null
)
{
for
(
String
hashtag
:
hashtags
)
{
Integer
count
=
hashtagsCounter
.
get
(
hashtag
);