Java Reference
In-Depth Information
There are many ways for Spring Integration to correlate incoming messages. To determine how
many messages to read until it can stop, it uses SequenceSizeCompletionStrategy , which reads a well
known header value (aggregators are often used after a splitter . Thus, the default header value is
provided by the splitter , though there's nothing stopping you from creating the header parameters
yourself) to calculate how many it should look for and to note the index of the message relative to the
expected total count (i.e., 3/22).
For correlation when you might not have a size but know that you're expecting messages
that share a common header value within a known time, Spring Integration provides the
HeaderAttributeCorrelationStrategy . In this way, it knows that all messages with that value are from
the same group, in the same way that your last name identifies you as being part of a larger group.
Let's revisit the last example. Suppose that the file was split and subsequently processed. You now
want to reunite the customers and do some cleanup with everyone at the same time. In this example,
you use the default completion-strategy and correlation-strategy . The only custom logic is a POJO
with an @Aggregator annotation on a method expecting a collection of Message objects. It could, of
course, be a collection of Customer objects because they are what you're expecting as output from the
previous splitter . You return on the reply channel a Message that has the entire collection as its payload:
<beans:bean id="customAggregator" class="com.apress.springenterpriserecipes.
springintegration.MessagePayloadAggregator"/>
<channel id="messagePayloadAggregatorChannel"/>
<channel id="summaryChannel"/>
<aggregator input-channel="messagePayloadAggregatorChannel"
ref="customAggregator"
output-channel="summaryChannel" />
The Java code is even simpler:
package com.apress.springenterpriserecipes.springintegration;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageBuilder;
import java.util.List;
public class MessagePayloadAggregator {
@Aggregator
public Message<?> joinMessages(
List<Message<Customer>> customers
) {
if (customers.size() > 0) {
return MessageBuilder.withPayload(customers).copyHeadersIfAbsent(
customers.get(0).getHeaders()).build();
}
return null;
}
}
Search WWH ::




Custom Search