• No results found

5. Implementation

5.3. Example Applications

We use our platform-specific prototypes to support QoI at runtime within different distributed open-source applications written in Java. In the remainder of this chapter, we illustrate the integration of our approach into the reactive MCCAT middleware to enhance Ganglia monitoring

and MySQL master-slave replication in public Cloud environments. Furthermore, we describe the extensions made to the benchmarking applications FINCoS and jms2009-PS.

McCAT: Enhance MySQL Replication and Ganglia in Public Clouds

Multi-cloud Cost-Aware Transport (McCAT) is an approach we propose to balance the network usage of distributed legacy applications with QoI requirements of a tenant in multi-Cloud de- ployments without having to change the application’s implementation [173].

Many distributed applications that are used in public Cloud deployments were originally de- veloped for use in environments where network traffic is sufficiently available and free of charge. Prominent examples are monitoring systems (e.g., Ganglia), data-parallel processing frameworks (e.g., Hadoop), and replicated databases (e.g., MySQL) deployed in clusters.

In today’s public Clouds, network traffic has become a major cost factor for data-intensive ap- plications: the Cloud provider charges a tenant for incoming and outgoing network traffic when operating the tenant’s applications in one or multiple data centers. Consequently, the tenant needs to have means to balance the network usage of its applications with the QoI of the ex- changed information, e.g., in terms of latency, completeness, or precision.

We propose MCCAT to make distributed applications network- and cost-aware while enabling

tenants to specify bandwidth budgets and QoI requirements. Instead of changing the implemen- tation of the application, MCCAT transparently intercepts data exchanged between components of

the distributed application using application-specific interceptors as shown in Figure 5.24.

Application Publisher Broker Subscriber Application

Broker

McCAT

Data Centre A Data Centre C

Transformation Filtering Aggregation Caching Multicast

McCAT adjusts inter-data centre traffic using

Compression

Remove

unused data of Reduce volume relevant data Avoid processingredundant data

Application Publisher Subscriber Application

Data Centre B

Broker

Figure 5.24.: MCCAT: make distributed legacy applications network- and cost-aware.

MCCAT tries to balance the network usage of the application with the QoI requirements of the

tenant by removing unused data, reducing the volume of relevant data and avoiding to send redundant data. These objectives are achieved by MCCAT dynamically:

1. transforming meaningful data from application-specific formats into generic notifications; 2. filtering out data that is not required by any receiving component;

3. compressing and aggregating relevant data before sending; 4. caching data and processing it with application-level multicast.

For example, in a Ganglia system deployed across multiple data centers, network traffic between data centers can be significantly reduced by aggregating multiple updates of monitored metrics before transmission if their sampling frequency is higher than the received rate required at the moment (e.g., sampling frequency of one update every five seconds vs. required granularity of five minutes’ average), or by filtering metrics that are not being displayed to user.

Similarly, a master-slave replication deployment of MySQL across multiple Amazon EC2 regions can save bandwidth by multicasting the same replication update to multiple slaves, processing updates for selected tables only, or by delaying updates for a given amount of time to clear out redundant updates: if multiple updates to the same row of a table occur within a given amount of time, only the last update is processed.

Architecture Overview

MCCAT is realized as a distributed application consisting of MCCAT interceptors that act as pub-

lishers or subscribers in a DEBS and communicate via MCCAT brokers as shown in Figure 5.24.

Expectations are used to capture the QoI requirements of the tenant while publishers use capa- bilities. Adaptation advices sent by MCCAT brokers adjust the publication behavior.

MCCAT Interceptors: in MCCAT, there are two types of interceptors: senders and receivers. They

have different tasks but are quite similar in their design as shown in Figure 5.25.

Sender interceptor Application Parser Config Transform Filter Data Publisher Cache Notification

(a)Sender interceptor.

Receiver interceptor Application Parser Config Transform Filter Data Cache Subscriber Notification (b)Receiver interceptor.

Figure 5.25.: Design of the two MCCAT interceptor types.

Sender interceptors, as shown in Figure 5.25a, act as publishers in MCCAT. Each interceptor

uses application-specific components to mimic a receiving component (e.g., a MySQL slave) and intercept data from a sending component of the application. The MCCAT interceptor then ex-

as the binary format of MySQL replication updates or the XML schema of Ganglia monitoring updates, into generic notifications used by MCCAT. These notifications are filtered, to remove

currently irrelevant data, and cached, to purge redundant data.

A platform-specific publisher component sends notifications to all subscribers via MCCAT brokers.

Each publisher subscribes to ASIA aggregations about the number of subscribers interested in the data it publishes; notifications are published only if there is at least one matching subscriber ac- tive. Adaptation advices received from the system adjust the behavior of the interceptor, either by using a ReactionManagerClient to adapt the sending component of the application (cf., Sec- tion 5.1.3), or by adapting the publisher within the interceptor.

Sender interceptors use an application-specific parser and a set of rules to analyze the configura- tion of the sending component. They extract the information necessary to intercept the sending component, identify relevant data items, and generate appropriate advertisements with capabil- ities. This way, MCCAT can leverage application-specific knowledge while respecting application

semantics when filtering and caching.

Receiver interceptors, as shown in Figure 5.25b, act as subscribers in MCCAT. Each receiver

interceptor is associated with a receiving component of the application and mimics a sending component (e.g., a MySQL master). Internally, a platform-specific subscriber receives notifica- tions from MCCAT sender interceptors that it caches and filters. Before processing the update to

the receiving component of the application, the interceptor transforms the data contained in the notification back into the application-specific wire-format.

Like sender interceptors, receiver interceptors parse the configuration of their associated compo- nents for clues about the data these components expect to receive. Based on this knowledge and the tenant’s specification, subscriptions and expectations are generated and registered.

MCCAT Brokers: realize the communication between sending and receiving components of the

application. Each broker is extended by an ExpectationController to negotiate expectations with capabilities as well as an ASIAController to provide aggregated feedback to subscribers. Brokers are network- and cost-aware, i.e., they know which connections the cloud provider charges for and what the current utilization of these connections is. They apply lossy and lossless aggregation mechanisms to the notifications they process to reduce traffic whenever it becomes necessary to remain within the bandwidth budget set by the tenant.

Prototypes

We provide two prototypes of MCCAT based on the ASIA extension of REDS [172]. They provide

support for the Ganglia5monitoring system [298] and MySQL master-slave replication6.

MCCAT for Ganglia: the prototype of MCCAT intercepts the communication between gmond and gmetad agents. In a Ganglia system, gmond agents monitor the state of the host (virtual) machine

they are deployed on using different metrics. They update interested gmetad agents on remote machines with a configurable update frequency when multicast is available. In environments, such as public Clouds, where multicast is not available, gmetad agents query gmond agents at a given frequency following a multi-step protocol (request, acknowledgement, response, acknowl- edgement) over a given TCP socket. Gmond agents send an application-specific XML reply. Each interaction is stateless, i.e., gmond agents always send the full set of metrics they have measured even if the state of certain metrics has not changed since the last update.

5 http://ganglia.sourceforge.net/

6 https://dev.mysql.com/doc/refman/5.1/en/ha-vm-aws-deploy.html

In McCAT4Ganglia, we provide two kinds of interceptors: gmondInterceptor (sender intercep- tor) and gmetadInterceptor (receiver interceptor). They rely on existing Ganglia configuration files. We exploit the syntax of these configurations to store additional information for MCCAT,

such as explicit subscriptions (i.e., subscribe to a set of metrics, all metrics of a given host or a given cluster only), bandwidth budget, expectations to use, capability profiles to build on, or the types of aggregations to use for specific metrics. The information is added in a format that is ig- nored by an out-of-the-box Ganglia deployment but recognized by the application-specific parsers of MCCAT. Listings 5.7 and 5.8 show examples for a gmond as well as a gmetad configuration

file.

Listing 5.7: Extended gmond configuration for MCCAT to use aggregation and capability profiles.

1 # Lines starting with # are ignored by Ganglia. McCat ignores only those lines

2 # where # is followed by at least one space. All other lines starting with # are

3 # considered to be additional configuration parameters.

4 5 (...)

6

7 collection_group {

8

9 # sampling frequency in seconds for all metrics in this group

10 collect_every = 5

11

12 metric {

13 name = "cpu_num"

14 title = "CPU Count"

15 #aggr = AVG 16 #attributeToAggregate = value 17 #CPGUID=cp1 18 } 19 20 metric { 21 name = "cpu_speed"

22 title = "CPU Speed"

23 #aggregation = AVG 24 #attributeToAggregate = value 25 #CPGUID=cp1 26 } 27 28 (...) 29 }

Listing 5.8: Extended gmetad configuration for MCCAT to use subscriptions and expectations.

1 # Sebastian Frischbier:

2 # We make use of the syntax here to code additional information for McCAT into

3 # a valid gmond configuration file that can be used without McCAT as well.

4

5 # The first hostname is the host actually queried by gmetad.

6 # Only if this host fails to reply, the other hosts following that are queried

7 # For using the McCAT4Ganglia adapter, this very first host should be localhost.

8 # Otherwise, the adapter will show a warning message.

9 # You can check the correct settings from the adapter’s output on the terminal

10 # Unused configurations can be marked with a # at the start of each line

11

12 data_source "ASIA1" 1 localhost:8649

14

15 # Experimental syntax for fine-granular subscriptions (metric & host level) => sub:c=[ cluster]:h=[host]:m=[metric]:expectation=[GUID]

16 # if c,h,m or f are not explicitly set, wildcard * is assumed

17 # if no explicit subscriptions are used or --normalsyntax is set, the gmetadInterceptor for this gmetad subscribes to all available metrics from all hosts and clusters

18

19 # Example: subscription with expectation: cpu_idle metrics from all hosts of ASIA1 that conform to expectation ec1

20

21 #sub:c=ASIA1:m=cpu_idle:expectation=ec1 22

23 RRAs "RRA:LAST:0.0:1:100000" "RRA:AVERAGE:0.5:1:5856" "RRA:AVERAGE:0.5:4:20160" "RRA:AVERAGE

:0.5:40:52704" 24 gridname "ASIADC1" 25 all_trusted on 26 setuid_username ganglia 27 xml_port 8657 28 case_sensitive_hostnames 0

MCCAT for MySQL Master-Slave Statement-based Replication: the prototype of MCCAT inter-

cepts the communication between a MySQL replication master database and multiple MySQL replication slave databases. In MySQL, slave databases can register at a master to be informed about any change to rows in a specific database, e.g., INSERT, UPDATE, DELETE, DROP. When- ever such an update happens at the master, all slaves are notified by the master and receive a binary BinLogEvent containing the query causing the replication update; this statement-based replication is the default configuration for MySQL. The master and all slaves log each query to a BinLog record that can be used for rollback operations or to update slaves with outdated states.

In McCAT4MySQL, we provide two kinds of interceptors: mysqlSlaveInterceptor (receiver interceptor) and mysqlMasterInterceptor (sender interceptor). The interceptors parse the configurations of their database instances to learn the TCP ports used for replication as well as the current state of the BinLog record.

The masterInterceptor registers at the MySQL master on behalf of the replication slaves. Bin- Log events received from the master are buffered here until a sequence of interrelated events is complete and then forwarded by the publisher.

At the edge broker, BinLog events referring to the same table in a replicated database are cached. The size of the cache in terms of the number of BinLog events to add is determined by MCCAT.

Redundant replication actions currently kept in the cache are eliminated, e.g., only the latest update of subsequent updates for the same tuple is kept. Varying the size of the cache enables MCCAT to trade-off the latency and consistency of updates with traffic savings: the larger the

cache, the higher the probability for redundant updates to be observed and eliminated. The latency of updates, however, increases with an increasing cache size while the consistency of the slave databases is deteriorating. The bounds defined by the tenant for these generic properties define effective upper bounds for the cache size.

JMS-2009-PS: Benchmark for JMS

The jms2009-PS [368, 369] benchmark extends the official Standard Performance Evaluation Corporation (SPEC) benchmark7 SPECjms2007 for JMS. The workload of the SPECjms2007 benchmark is based upon a Supply Chain Management (SCM) scenario where different types and instances of participants (i.e., different distribution centers, supermarkets, headquarters etc.) interact with each other using a MOM. The benchmark is described in detail in Section 6.3.

Drivers' In te rac -on' No#fica#ons) SPEC' Reports' Expecta-ons' ClientHandler' Controller' Configura-on'' single'run' Satellite) Instruc-ons' Message=oriented' Middleware'' Loc a- on'Ins tanc e' No#fica#ons) EventHandler' Capabili-es' ClientHandler' Driver' Agent' (e.g.,)DC1)) Results' run.proper#es) [topology].proper#es) Default.env) [pla=orm].proper#es) 'PlaBorm'' configura-on' Detailed'Traces' Store) Store) Expecta-ons' ClientHandler' EventHandler' Capabili-es' ClientHandler' Driver'

Figure 5.26.: jms2009-PS architecture and custom extensions (blue).

Figure 5.26 shows the design of SPECjms2007 and jms2009-PS: an agent encapsulates all pub- lishers (drivers) and subscribers (event handlers) that represent a physical location (e.g., DC1)

in the scenario. A satellite manages all agents that represent at specific type of participant (i.e., headquarters, distribution centers etc.). A controller coordinates all satellites. The controller manages the lifecycle of its satellites and analyzes the results of a benchmark run.

Each driver and event handler is already designed as a dedicated JMS producer or JMS consumer in SPECjms2007. Hence, we have extended them as described in Section 5.2.1: each driver maintains its capabilities using a dedicated instance of CapabilityHandlerClient while subscribers do the same for expectations using their dedicated instance of ExpectationHandlerClient. This way, the additional load introduced by expectations and capabilities is tied to the benchmark’s workload and scales with it.

FINCoS Benchmarking Tool

FINCoS [305] is an open-source benchmarking tool recommended by the SPEC Research Group8 to evaluate Complex Event Processing (CEP) engines and EBS infrastructures in terms of per- formance and correctness. FINCoS allows for defining and running complex custom workloads against different platforms without having to individually tailor or convert the same workload 7 http://spec.org/jms2007/

for each targeted platform. In addition, FINCoS takes care of measuring and logging Key Perfor- mance Indicators (KPIs) such as latency or throughput of the notifications processed by the sys- tem under test. FINCoS is written in Java and designed as a distributed application where agents act as subscribers or publishers and thus simulate a heterogeneous population of an EBS.

Daemon'Service' Drivers' Drivers' No#fica#ons) Traces' Synthe#c) Performance'' monitor' FINCoS' Analy9cs' Expecta9ons' ClientHandler' REDS' Ad ap te rs ) Ac9veMQ' ...) Capabili9es' ClientHandler' REDS' Ac9veMQ' ...) Ad ap te rs ) Topology)(XML)) Connec#ons.fcf) Controller' Configura9on' Drivers) Sinks' Phase)Phase) Phases) Workload' Phase'Phase' Phases' Latency) Rates) MessageIoriented' Middleware'' Detailed'Traces' Se lfIa da pt a9o n' No#fica#ons) Detailed'Traces'

Figure 5.27.: Architecture and custom extensions (blue) of the FINCoS benchmarking tool. As shown in Figure 5.27, load-generators (drivers) act as publishers in an EBS and send cus- tomized workloads to the system under test. Agents (sinks) act as subscribers based on prede- fined subscription patterns and receive the processed notifications. A FINCoS controller coordi- nates the lifecycle of all agents that act as subscribers or publishers. Publishers and subscriber in FINCoS can be distributed across different hosts for better performance. Hence, a daemon ser- vice is running on each host to connect subscribers, publishers and controller across hosts using Remote Method Invocation (RMI). A performance monitoring tool (perfmon) provides analytics such as the deviation of throughput and latency of the notifications received by subscribers over the course of the experiment.

FINCoS uses adapters to connect to CEP engines or EBS infrastructures implemented on different platforms. Each agent requires a custom adapter to convert a platform-independent notification into a platform-specific message and send it to the system under test (and to receives as well as decode it back). We have implemented adapters for drivers and sinks to interact with ActiveMQ and REDS based on the interfaces provided by FINCoS to custom adapters. We implemented a separate adapter for ActiveMQ as the provided adapter for the JMS caused issues with dynamic topics on ActiveMQ.

We have customized and extended FINCoS to handle expectations, capabilities as well as the feedback provided by a MOM supporting expectations and ASIA. Subscribers can now associate one or more expectations with a subscription and can change that behavior over the duration of an experiment with or without changing their subscriptions. Publishers can associate capabilities with their advertisements, change them at runtime, and – first and foremost – they can now adapt their publication behavior at runtime based on individual feedback given by the MOM. The behavior of each publisher or subscriber can be individually defined in FINCoS in a platform- independent way. The behavior of publishers is centered around the notion of a phase. In

FINCoS, a phase describes a duration over which a publisher publishes one or more types of no- tifications at a given sampling rate. The sampling rate can be constant, increasing or decreasing during a phase. The content of each notification can be fully customized in terms of attributes and values with randomized or predefined values for each attribute. Alternatively, each publisher can replay datasets that contain recorded traces of notifications. Per publisher, multiple phases can be defined with individual durations, which allow the configuration of complex dynamics and workload patterns.

The behavior of subscribers is rather static in comparison: subscribers can define content-based subscriptions but cannot change them at runtime, as there is currently only a single phase con- figurable for subscribers. Thus, we have implemented phases for subscribers as this enables us to simulate requirements that change at runtime. Subscribers can now change their subscriptions, their aggregation requests for aggregated feedback or their expectations during the course of an experiment: as for publishers, multiple phases can be defined for each subscriber. Subscriptions and expectations are registered, revoked or updated at the start of each phase.

Each instance of a subscriber or publisher maintains its own local repository of expectations (or capabilities, respectively). In addition, we have also implemented a support for aggregated feed- back about the dynamics and population of the EBS at runtime. Both subscribers and publishers can request updates about aggregated metrics such as publisherCount, subscriberCount, or pub-

licationRate. For each request, an individual imprecision can be set. Aggregation request are

registered, updated or revoked at the start of each phase by both subscribers and publishers.

5.4 Summary

In this chapter, we have focused on the implementation of our approach to support QoI require- ments in EBS using the proposed concept of expectations, capabilities and feedback.