Databases Reference
In-Depth Information
}
@Override
public
void
prepare
(
Map
stormConf
,
TopologyContext
context
)
{
}
@Override
public
void
execute
(
Tuple
input
,
BasicOutputCollector
collector
)
{
String
tweet
=
input
.
getStringByField
(
"tweet"
);
String
tweetId
=
input
.
getStringByField
(
"tweet_id"
);
StringTokenizer
strTok
=
new
StringTokenizer
(
tweet
,
" "
);
TransactionAttempt
tx
=
(
TransactionAttempt
)
input
.
getValueByField
(
"txid"
);
HashSet
<
String
>
users
=
new
HashSet
<
String
>();
while
(
strTok
.
hasMoreTokens
())
{
String
user
=
strTok
.
nextToken
();
// Ensure this is an actual user, and that it's not repeated in the tweet
if
(
user
.
startsWith
(
"@"
)
&&
!
users
.
contains
(
user
))
{
collector
.
emit
(
"users"
,
new
Values
(
tx
,
tweetId
,
user
));
users
.
add
(
user
);
}
}
}
@Override
public
void
cleanup
()
{
}
}
As mentioned earlier in this chapter,
UserSplitterBolt
receives tuples, parses the text
of the tweet, and emits words preceded by
@
, or the Twitter users.
HashtagSplitter
Bolt
works in a very similar way.
public
class
HashtagSplitterBolt
implements
IBasicBolt
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declareStream
(
"hashtags"
,
new
Fields
(
"txid"
,
"tweet_id"
,
"hashtag"
));
}
@Override
public
Map
<
String
,
Object
>
getComponentConfiguration
()
{
return
null
;
}
@Override
public
void
prepare
(
Map
stormConf
,
TopologyContext
context
)
{
}
@Override
public
void
execute
(
Tuple
input
,
BasicOutputCollector
collector
)
{