Splitting and aggregating messages
7.3 Useful patterns
7.4.1 Extension points of the CorrelatingMessageHandler
The CorrelatingMessageHandler can process a group of messages in two ways: mes- sage in and message out. When the message comes in, it’s correlated and stored. When a message group might go out, it’s released, processed, and finally marked as com- pleted. Let’s look into the details of each of those steps—correlate, store, release, pro- cess, complete—as shown in figure 7.6, and introduce collaborators as we go along.
2 In Spring Integration 2.1, this has been refactored so that AbstractCorrelatingMessageHandler is a
common base class of the Aggregator and Resequencer handler classes. Many of the details discussed here changed as well, but the default strategies are the same. See the latest reference manual for details.
handle message
get correlation key
add message to group
can release group?
ReleaseStrategy
MessageGroupProcessor
process message group if [group complete] remove group
handle message CorrelatingMessageHandler CorrelationStrategy age to group MessageGroupStore Figure 7.6 The CorrelatingMessageHandler
135
Under the hood
When a message hits the CorrelatingMessageHandler, the first thing it needs to do is figure out what MessageGroup this message belongs to. The message group is defined by its correlation key (not to be confused with correlation ID). The correlation key is retrieved from the CorrelationStrategy, which defaults to a HeaderAttribute- CorrelationStrategy. The default strategy picks the correlation ID from the message headers, but this doesn’t have to be your strategy.
After the correlation key is found, the message can be stored with its group. For this it uses a MessageGroupStore, which defaults to an in-memory implementation. Storage used by the CorrelatingMessageHandler can be entirely customized. A Jdbc- MessageStore is available in the framework, but it stands to reason that a NoSQL store is more fitting in many cases.3 The storage will hold all incomplete message groups, so
it’s important to consider memory consumption and performance in case of large groups or large numbers of incomplete aggregates.
After the message is stored, that message’s group is considered for release. For example, a completed aggregation will be released, or a partially completed group that may contain the first few elements of a sequence may be released. The release strategy says nothing about the completeness of the group. Its only responsibility is to decide whether the message processor may process this particular group.
Once a group is released, it’s handed to the MessageGroupProcessor of the CorrelatingMessageHandler. This is where the actual operations on the messages are performed. The processor is handed a template to send messages with and is expected to make all decisions relevant to sending output messages. It’s also responsi- ble for marking the messages it has processed in the message group.
The marked messages are then recognizable as processed if the same group hits the processor later. There is no restriction on the contract that the processor has to fulfill that disallows it from reprocessing marked messages.
In the next few paragraphs, you’ll see the implementation of aggregator and resequencer as examples of the mentioned strategies. In both aggregator and rese- quencer, correlation and storage are the same (and trivial), so we go into the details of release and processing only.
7.4.2 How do Resequencer and Aggregator do it?
The aggregator, as discussed earlier, takes a group as a whole and forges a new mes- sage out of it. We look at the release and processor strategies in detail in the next few paragraphs.
The release strategy of an aggregator should be to release the group only when the processing is complete (or if it times out). The SequenceSizeReleaseStrategy imple- mentation handles this behavior. For this common case, the MessageGroup has an isComplete() method, the default implementation of which compares the sequence size header to the size of the group. This is convenient if you’re implementing a cus- tom release strategy but still are interested in the default completeness of the group.
136 CHAPTER 7 Splitting and aggregating messages
The message group processor of an aggregator should turn all the messages of a group into a single aggregated message and send it off to the output channel. The most common implementation used is the MethodInvokingMessageGroupProcessor, which wraps around a method. The method should have the following signature (pointcut expression language):
* *(List)
Similar to other implicit conversions to and from messages in the framework, Spring Integration automatically unwraps the elements in the list if they’re not messages. The return value is wrapped in a message if needed and sent to the output channel of the aggregator.
The resequencer example follows the same lines as the aggregator with two main differences. First, the messages from an incomplete group may already be released. Second, the processor is expected to return the same messages that came in.
By default, the release strategy used by the resequencer is also the SequenceSize- ReleaseStrategy. In the case of a resequencer, the releasePartialSequences flag can be set. This flag allows the release strategy to release parts of an incomplete sequence that are in the right order to allow for a smoother message flow.
The message group processor of a resequencer takes all the messages in the group, orders them, and then sends all the messages that form a sequence to the output channel. The main customization is to supply a different comparator for the ordering so sequence numbers can be avoided.
In summary, one central component, CorrelatingMessageHandler, uses several strategies to delegate its work. CorrelationStrategy is used to find the correlation key of the message group, and MessageGroupStore is used to store the message group. To decide when to release the group for processing, a ReleaseStrategy is used. A MessageGroupProcessor finally deals with the messages. Implementations of these strategies together form the different correlating endpoints.
7.5
Summary
In this chapter you learned to deal with splitters, aggregators, and resequencers. You also saw examples of some nontrivial aggregator use cases and finally looked at the design that’s at the core of Spring Integration. Let’s review what you learned about splitters first.
One message goes in; many messages come out.
The output can be based on the payload but also other criteria, such as headers. The splitter sets a correlation ID, sequence size, and sequence number for each
message.
The chapter also discussed endpoints that group messages together before sending reply messages. Correlation, the basis for both aggregating and resequencing, was examined in detail. The following points are relevant to remember:
137
Summary
CorrelationStrategy finds the correlation key, which is based on a message and doesn’t have to be the correlation ID (for example, as set by a splitter). MessageGroupProcessor determines what happens in reaction to the release of
a group.
ReleaseStrategy determines when a group is released. A group can be released multiple times.
MessageGroupStore stores the messages until they are processed.
The aggregator uses a processor that aggregates the messages. By default, its correla- tion and release strategies are complementary to the splitter.
The resequencer processes messages by reordering them. Its correlation and release strategies are similar to those of the aggregator with the exception of releasing partial sequences. When partial sequences are released, multiple releases for the same group may happen.
Now that you’ve read this chapter, you should have a clear idea how Spring Inte- gration can help you when you need to split up some work or aggregate the results of some operations back together. Aggregation is a particularly complex use case that often differs subtly from the examples found in this book or online. In some cases it pays to write a custom solution. It’s particularly important here to consider carefully the pros and cons of extending the framework versus inventing your own.
This concludes part 2 of the book. You now know how the core of Spring Integra- tion works. We reviewed all the main components in the core and showed you several examples of messaging applications using the components. But this is only the foun- dation. The interesting part comes when you start integrating with remote systems and look beyond the walls of the JVM. In the next chapter, you’ll work with XML, because it’s the ubiquitous language of system integration. Chapter 8 shows you con- cepts from this and previous chapters reused in the context of XML payloads, such as the XPath splitter and the XPath router. From there, we’ll look at many different inte- gration possibilities. Read on!