Chapter 4. Application design
4.5 Messaging application design
4.5.2 Message consumers
This section contains design guidelines and best practices applied to message consumers. It discusses the synchronous as well as the asynchronous
implementation approach. A behavior that needs to be supported by both types is the ability to select specific messages from a queue, for example, to get just the corresponding reply for a request.
Selective consumer
The selective consumer enables selective reading of specific messages in a queue. This ability is required because it is often not possible to create a specific queue for each message type, either because this would involve too many queues or because dynamic queues are preferred and not known in advance. Messaging addresses this issue by providing a mechanism that allows message producers to set message header properties that can be used from message consumers as selection criteria for specific messages.
Selective consumers can be used to implement filtering, dispatching, and Message ID not found Message ID found
Get request
Check log
Message ID found Message ID not found
Get reply from log Process request
Rollback processing Log record
Consider, for example, a car purchase process. An employee may buy cars without telling someone until a specific price limit; above that limit there is notification to his manager needed. The notification process can be implemented using messaging by copying the purchase price as a property into the message header, thus allowing two different consumers to select messages based on the price. One consumer would have to consume the messages below the critical price, the other one above this price, providing notification functionality.
Example 4-1 shows a code sample of a producer that sets the price as a header property in the message.
Example 4-1 Message producer
public class JmsSelectionSupportedProducer {
public void sendMessage(String msg, String price, String conFactoryName, String destName) throws NamingException, JMSException { ...
//Create the message, set the property and send the message TextMessage message = session.createTextMessage(msg); message.setIntProperty(“price“, price);
producer.send(message); ...
} }
Example 4-2 shows a code sample of a selective consumer. The consumer only reads messages whose value of the price attribute in the message header is higher than 10000.
Example 4-2 Selective consumer
public class JmsSelectiveConsumer {
public String receiveMessage(String conFactoryName, String destName) throws NamingException, JMSException {
String messageSelector = “price > 10000”;
//Use the session, destination and selector to create the consumer consumer = session.createConsumer(destination, messageSelector);
//Reveive the message
TextMessage message = consumer.receive(); ...
} }
The selection criteria should be chosen in a way that guarantees all possible variants are served. If this is not the case messages that do not expire may stay in the queue without ever being read by a consumer.
Polling consumer
The polling consumer acts in a synchronous manner since the receiver thread is blocked until a message is available. Normally, it is the application or service that controls polling consumers by telling them when to start polling.
A pseudo-synchronous request-reply scenario is a sample where polling consumers are applied. The service consumer triggers the receiver functionality immediately after sending the request. The receiver thread is then blocked until either the reply message arrives or a receiver timeout occurs.
Design considerations that should be made during the design of polling
consumers involve the definition of receiver timeouts as well as definitions about exception paths to take if expected messages do not arrive. Additional design is needed for consumers that may just read specific messages, for example, messages with a specific correlation ID.
Example 4-3 shows a code sample of a simple polling consumer. The consumer could be used as a receiver on the service consumer side to support a
pseudo-synchronous request-reply scenario. Note the selector, which enables the consumer to listen for a message with a specific correlation ID.
Example 4-3 Polling consumer
public class JmsPollingConsumer {
public String receiveMessage(String conFactoryName, String correlationId, String destName, int timeout) throws NamingException, JMSException {
Session session = null; Connection connection = null; MessageConsumer consumer = null; String msg = null;
String messageSelector = “JMSCorrelationID=’ID:“ + correlationId + “‘“;
try {
//Get the specified connection factory and queue Context jndiConext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory)
jndiContext.lookup(conFactoryName); Destination destination = (Destination)jndiContext.lookup(destName);
//Create the connection and session
Session.AUTO_ACKNOWLEDGE);
//Use the session, destination and selector to create the consumer consumer = session.createConsumer(destination, messageSelector);
//Reveive the message
TextMessage message = consumer.receive(timeout); msg = (String)msg.getText(); } finally { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } return msg; } }
Event-driven consumer
An event-driven consumer acts in an asynchronous manner since it
automatically consumes messages as they become available. Unlike the polling consumer, the event-driven consumer does not have a blocked receiver thread. Instead, a new thread is started as soon as a message is available.
Using event-driven consumers instead of polling consumers allows systems to behave more responsively because messages are processed as they arrive and not as the application decides.
Event-driven design provides the following benefits:
Eases implementation of applications and services involving unpredictable and asynchronous occurrences
Eases assembling of existing applications and services Supports component and service reuse
The design of event-driven consumers should include message throttling. Because event-driven consumers are triggered by the arrival of messages, the design needs to make sure that the system is not overloaded. A simple way to achieve throttling is to limit the number of threads that can be created
dynamically to process incoming messages.
Example 4-4 shows a code sample of a simple event-driven consumer.
Event-driven consumers are supported by message driven beans (MDBs). MDBs provide an onMessage method, which is called by the infrastructure for each incoming message.
Example 4-4 Event-driven consumer
public class JmsEventDrivenConsumer implements MessageDrivenBean, MessageListener { ...
public void onMessage(Message msg) {
//This method is triggered by each incomming message }
}
Handling of poison messages
A poison message is a message that an event-driven consumer is not able to process. It might be corrupt or just in an unexpected format. If an event-driven consumer discovers a poison message it has several options for dealing with it: Roll back the message.
By triggering rollback on an event-driven consumer the message is put back on the queue where it came from. This approach only works if the consumer runs within a transaction.
Move the message to another queue.
Moving the message to another queue enables special processing for that message. This approach is useful if the consumer does not run within a transaction.
Discard the message.
The message is deleted, and thus lost without being processed.
The rollback approach needs special consideration, as it can lead to possible loops. Consider the case of the rollback of a corrupt message back into the queue where it came from. The same message stays corrupt and will therefore be rolled back again and again and again. A useful approach for resolving this issue (also provided by MDBs) is the definition of a
maximum retries
property that defines how many times a consumer tries to read the same message beforethe listener port automatically stops listening. Unfortunately, this stops all processing. In fact, there are two more properties needed, and provided by JMS: The
redelivery count
message propertyThis property defines how many times a message has already been read from the queue.
The
backout threshold
destination propertyWhen the same message has been read a number of times equal to the backout threshold, the infrastructure moves the message to a default queue for messages that could not be delivered. The queue is called the dead letter queue.
A rollback behavior that supports multiple (but not unlimited) processing retries without stopping the listener can be achieved by setting the maximum retries higher than the backout threshold. This allows the consumer to try to process a message for the number of times specified by the backout threshold without reaching the maximum retries number. Each time the redelivery fails the redelivery count is incremented. When the redelivery count equals the backout threshold, the message is moved to the dead letter queue. Maximum retries is never reached and the listener port is not stopped.