Databases Reference
In-Depth Information
To use this grouping in the example, change the
word-normalizer
grouping by the fol-
lowing:
builder
.
setBolt
(
"word-normalizer"
,
new
WordNormalizer
())
.
customGrouping
(
"word-reader"
,
new
ModuleGrouping
());
Direct Grouping
This is a special grouping where the source decides which component will receive the
tuple. Similarly to the previous example, the source will decide which bolt receives the
tuple based on the first letter of the word. To use direct grouping, in the
WordNormal
izer
bolt, use the
emitDirect
method instead of
emit
.
public
void
execute
(
Tuple
input
)
{
...
for
(
String
word
:
words
){
if
(!
word
.
isEmpty
()){
...
collector
.
emitDirect
(
getWordCountIndex
(
word
),
new
Values
(
word
));
}
}
// Acknowledge the tuple
collector
.
ack
(
input
);
}
public
Integer
getWordCountIndex
(
String
word
)
{
word
=
word
.
trim
().
toUpperCase
();
if
(
word
.
isEmpty
())
return
0
;
else
return
word
.
charAt
(
0
)
%
numCounterTasks
;
}
Work out the number of target tasks in the
prepare
method:
public
void
prepare
(
Map
stormConf
,
TopologyContext
context
,
OutputCollector
collector
)
{
this
.
collector
=
collector
;
this
.
numCounterTasks
=
context
.
getComponentTasks
(
"word-counter"
);
}
And in the topology definition, specify that the stream will be grouped directly:
builder
.
setBolt
(
"word-counter"
,
new
WordCounter
(),
2
)
.
directGrouping
(
"word-normalizer"
);
Global Grouping
Global Grouping sends tuples generated by all instances of the source to a single target
instance (specifically, the task with lowest ID).