5. Implementation
5.2. Platform-Specific Prototypes
5.2.1. Centralized Implementation: Apache ActiveMQ
Apache ActiveMQ3is an industry-strength open-source Java Message Service (JMS) MOM, which also supports other messaging protocols such as Advanced Message Queuing Protocol (AMQP), Simple Text Oriented Messaging Protocol (STOMP), and OpenWire [390]. Topics are used for many-to-many communication while queues implement point-to-point communication. Content- based routing is supported by adding attributes to notifications and defining filters on these attributes; explicit advertisement and unadvertisements, however, are not supported.
Using a plugin architecture, ActiveMQ allows to extend the MOM with custom functionality with- out the need to change existing code. A plugin has access to the state of the broker instance it is running on to intercept and modify any notification being processed by the broker. Furthermore, 3 https://activemq.apache.org
events indicating changes in the lifecycle of other participants (i.e., subscribers, publishers, or brokers) can be intercepted and modified as well. ActiveMQ is using an interceptor stack and container model to provide full access to its internal routing while isolating the business logic of a plugin and easing parallelization at the same time. Thus, multiple plugins can be deployed together without causing side-effects as illustrated in Figure 5.17 for ExpectationController plugin, ASIAController plugin and the Latency plugin described in [138].
ASIA Controller Plugin Expectation Controller Plugin Latency Plugin
…
ActiveMQ Broker Instance
P S
Publisher Subscriber
Figure 5.17.: Plugin-support on Apache ActiveMQ.
Due to this modular plugin architecture, we provide ExpectationController and ASIAController as two separate plugins on ActiveMQ.
Access to the broker state is offered by ActiveMQ through the API of the BrokerPluginSupport class that can be extended by each plugin. Selected parts of the API are shown in Figure A.6 in the appendix.
ActiveMQ is widely used as a centralized MOM. While clusters and networks of brokers can be supported for availability reasons, ActiveMQ itself does not support any higher-level routing strategies other than forwarding [302]: brokers subscribe on behalf of their local subscribers to all their neighbors while publications are flooded to all brokers. Within the business logic of ActiveMQ there is no distinction between edge and inner brokers as shown in Figure 5.18 for forwarding notifications. This is relevant for the interaction between the broker and our extensions ExpectationController and ASIAController.
ActiveMQ broker Internal routing P S Publisher Subscriber Notification ActiveMQ broker sent postProcessDispatch preProcessDispatch
Figure 5.18.: Forwarding of notifications between multiple brokers using the same methods.
ExpectationController Plugin
We provide ExpectationController as a plugin on ActiveMQ by extending BrokerPlugin- Support and implementing the ExpectationController interface. The integration of the ExpectationControllerwith methods of ActiveMQ’s API is shown in Figure 5.19.
The ResourceMonitor has to be triggered by changes of the broker state. Thus we add triggers to the methods of the ActiveMQ broker that are called whenever subscribers, publishers, or brokers join the system (addConsumer, addPublisher, addBroker) or leave (removeConsumer, etc.) as well as whenever topics or queues are added (addDestination) or removed.
send addProducer addConsumer addDestination ExpectationControllerPlugin preprocessDispatch Trigger update ResourceMonitor … DispatchPolicy
routing adaptation, rateController, latency minimization
ReactionCoordinator
Figure 5.19.: ExpectationController: utilized joinpoints in ActiveMQ plugin API.
Notifications about expectations and capabilities are received and forwarded as MapMessages by a separate JMS subscriber that is subscribed on two topics provided on the broker: expectations.controland capabilities.control. The client handlers publish on these top- ics to register, update or revoke expectations and capabilities. The MOM publishes feedback on the topics expectations.feedback and capabilities.feedback that client handlers are sub- scribed to. We decided to use separate topics for the communication about expectations and capabilities to minimize the number of processed notifications we have to parse.
Runtime adaptation is realized by transparently changing the subscriptions of subscribers for routing adaptation and by using a customized DispatchPolicy to deploy rateController in- stances for those subscribers that require a reduction of the sampling rate.
On ActiveMQ, adjusting the DispatchPolicy is a powerful tool to transparently and efficiently intervene with the processing of notifications. At least one DispatchPolicy instance is assigned to a new destination. ActiveMQ hands over each notification that is processed for this destina- tion to the activeDispatchPolicy together with a set of subscribers for this destination. The DispatchPolicythen decides for each subscriber if the notification should be dispatched. We inject the rateController mechanism into the dispatching process of ActiveMQ by substitut- ing the generic DispatchPolicy with the RateControllerDispatchPolicy shown in Listing A.1. This customized policy maintains a list of rateControllers registered for dedicated subscribers and types of notifications. If there is no rateController for a subscriber, the original notification is forwarded without alterations. The injection is done when a new destination is registered as shown in Listing A.2, lines 11-14. The listings are shown in the appendix.
ASIAController Plugin
We provide ASIAController as a plugin on ActiveMQ in the same way by extending Broker- PluginSupportand implementing the ASIAController interface.
As for the ExpectationController plugin, changes to the population known to the broker as well as incoming notifications trigger changes to the Repository. The Repository updates the local values for different aggregators and triggers the aggregators. Figure 5.20 shows which methods of ActiveMQ’s API trigger the six aggregators we have implemented for ASIA.
Incoming aggregated results sent by upstream brokers on the aggregation tree are detected in the sent method of the plugin. They result in updates of their respective aggregates. The same method is used to forward updates about aggregated metrics downstream the aggregation tree.
P Subscriber send Publisher addProducer addConsumer
S
- publicationRate - publisherCount - publisherCount - advertisement Count - subscriberCount - subscriptionRate addDestination ASIAController Plugin ASIA aggregators Incoming: update , Outgoing: notify downstream participants using piggybacking or additional notificationstcrec tcnew
Incoming: update
local values
Figure 5.20.: ASIA aggregators: utilized joinpoints in ActiveMQ plugin API.
Aggregate update information is piggybacked onto existing JMS messages by adding attributes with a common prefix that describe the aggregated metric, its upper and lower bounds as well as the imprecision used to compute the interval. These additional attributes are stripped at the receiving broker. Subscribers and publishers are notified with separate MapMessages.
Extending Publishers and Subscribers
In addition to platform-specific implementations for brokers and client handlers, developers need to extend the code of existing publishers and subscribers to
1. access ExpectationHandlerClient and CapabilityHandlerClient;
2. define expectations and capabilities and register them at the MOM; and 3. react to feedback sent from the MOM.
Reacting to feedback is based on the semantics of the application and thus requires application- specific extensions that can differ in extent and complexity. Accessing the client handlers and managing the lifecycle of expectations and capabilities, on the contrary, is not.
Listing 5.3 shows a minimalistic example of a JMS subscriber that uses expectations to manage its QoI requirements. We omit most of the additional boilerplate code used to manage the lifecycle of the subscriber and the JMS connection itself. We assume that the subscriber subscribes to a predefined topic with a given filter expression; the approach is the same for queues.
The additional code necessary to integrate our ExpectationHandlerClient into the business logic of a subscriber consists of general bootstrap code to initialize the client handler (line 15 - 16) and load a list of predefined expectations saved in a separate file. Alternatively, the subscriber can define expectations on the fly after initializing the client handler. For every expectation to be activated for a given JMS topic and filter expression, the subscriber needs only a single line of code (line 17). The subscriber identifies itself at the MOM using the clientID attribute of the JMS connection. This identifier is valid for all subscriptions of the connection.
We want to point out the ratio between the eight lines of code necessary to establish a connection to a typical JMS MOM (lines 21-28) and the additional three lines of code necessary to include expectations (lines 15-17).
In our example, every update about the state of a registered and active expectation triggers the subscriber to print a list of all the feedback received so far using the method provided by the client handler (line 35).
Listing 5.3: Minimalistic JMS subscriber using expectations.
1 public class SubscriberExpectationAware implements ExpectationAwareSubscriber,
MessageListener {
2
3 private Connection connection;
4 private Session session;
5 private Topic topic;
6 private MessageConsumer subscriber;
7 private ExpectationHandlerClientActiveMQ eH;
8
9 (...)
10
11 public SubscriberExpectationAware(String url, String clientID, String topicName, String
filter, String xID, String configFile) throws JMSException {
12
13 this.setupJMS(url,clientID,topicName,filter); //establish JMS connection 14
15 eH = new ExpectationHandlerClientActiveMQ(this, configFile, true);
16 eH.addLocalExpectationsAuto();//load all expectations in this file
17 eH.registerExpectation(xID, new EventType(topic,filter));
18 } 19
20 private void setupJMS(String url, String clientID,filter) {
21 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
22 connection = factory.createConnection();
23 connection.setClientID(clientID);
24 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
25 topic = session.createTopic(topicName) 26 subscriber = session.createConsumer(topic,filter) 27 subscriber.setMessageListener(this); 28 connection.start(); 29 } 30 31 (...) 32
33 @Override //handle feedback
34 public void onUpdate(FeedbackExpectation news) {
35 System.out.println(this.expectationHandler.summarizeFeedback());
36 }
37
38 (...)
Listing 5.4 shows a minimalistic example of a JMS publisher that uses capabilities to announce its support for QoI properties. As for the subscriber example, we omit most of the additional boilerplate code used to manage the lifecycle of the publisher itself. Again we assume that the publisher publishes to a single predefined topic; the approach is the same for queues or multiple topics/queues.
As in our subscriber example, we need only two additional lines of code to bootstrap the client handler and load a list of predefined capabilities (line 15 + 16) while a single line of code is required to register a capability profile for a given JMS topic (line 17).
In our example, receiving an adaptation advice from the MOM (line 32) triggers publisher- specific code for adapting all capabilities required to change. Having completed the adaptation, the publisher updates the current value for the respective capability and notifies the MOM using the client handler (line 37).
Listing 5.4: Minimalistic JMS publisher using capabilities.
1 public class PublisherCapabilityAware implements CapabilityAwarePublisher {
2
3 private Connection connection;
4 private Session session;
5 private Topic topic;
6 private MessageProducer publisher
7 private CapabilityHandlerClientActiveMQ cH;
8
9 (...)
10
11 public PublisherCapabilityAware(String url, String clientID, String topicName, String
filter, String cID, String configFile) throws JMSException {
12
13 this.setupJMS(url,clientID,topicName);
14
15 cH = new CapabilityHandlerClientActiveMQ(this, configFile, true);
16 cH.addLocalCapabilitiesAuto();
17 cH.registerCapabilityProfile(cID, new EventType(topic));
18 }
19
20 private void setupJMS(String url, String clientID) {
21 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
22 connection = factory.createConnection(); 23 connection.setClientID(clientID);
24 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
25 topic = session.createTopic(topicName) 26 publisher = session.createProducer(topic,filter) 27 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 28 connection.start(); 29 } 30 31 @Override
32 public void onUpdate(AdaptationAdvice news) {
33 for (RequirementForCovering req : news.getRequirements().values()) {
34
35 (...)
36
37 this.capabilityHandler.updateCapabilityCurrentValue(news.getAssociatedGUID(), req.
getAbbrev(), req.getRequiredValue());
38 }
39 }
40