Complex Event Processing (CEP) systems [55] are used in various scenarios, processing several thousands of events per second with sub-milliseconds la- tency [56]. CEP engines like ESPER [57], StreamBase [58], Oracle Complex Event Processing (OCEP) [59], have the aim of efficient stream process- ing for data arriving at high-rates. In general, these types of applications are dedicated to fields like (but not limited to): stock trading [60], stream
monitoring, information integration workflows [61]; exception management; financial services; health care; IT monitoring; telecommunication; logistics; sensor networks; fraud detection, finding patterns in data. Recently, these topics have received significant attention in the research community [62], [63], [64], [65]. The choice of such applications lies in the need to process events in real-time. The main considerations for such applications are the need for high-throughput, between 1000 and 100k messages per second, low- latency processing for event streams (between milliseconds and seconds) and the logical complexity.
CEP systems are able to process data at high-rates. Simultaneous accesses to data-warehouses are usually not performed, because they tend to be slow. However, it is useful to be able to correlate stream information with data- warehouse information. We investigate how to make CEP automatically scale and load balance the processing while at the same time querying the main data-warehouse.
There are several commercial CEP systems, such as RuleCore CEP server [66], Coral8 Engine [67] and Esper [57] (also with an opens source version), several open research prototypes also exist [68]; Cayuga [69] with two ver- sions, a single CEP engine designed to achieve high performance and a dis- tributed version of the system, that merges events and algebra expressions. Many academic prototypes have been developed to distribute operators, Borealis [70], DCEP [71] and others [72], [73], [74]. One of the most cited systems, Borealis [70], an extension to Aurora [72] and Medusa [75], focuses on operator placement and other strategies across a set of Aurora Engines. Other research groups started focusing on elastic streaming over Map- reduce [76]. These approaches use distributed file systems, such as HDFS, and large amounts of memory to attain elastic scalability with high-rate
data processing, using a set of Map-Reduce nodes, focusing on CEP queries and operator placement optimization.
Analysis:
Some of these systems include the capability to query databases, however they become dependent of the database processing speed and this may compromise the whole context that assumes periodic results. If the data- warehouse process speed is slow, then the processing of events which also query the data-warehouse will be a↵ected.
Neither academy nor industry have investigated algorithms for speeding up the timely execution of queries in CEP and databases together. The investi- gation work that we did provides monitoring, scaling and admission control guarantees over high-rate CEP processing that includes analysis of DB data as part of the queries. We also investigate the auto-scale mechanisms for integration of data-warehouse and support of CEP queries with automatic load balancing and scaling (out/in) when necessary.
Overview of automatic
scalability and freshness
In this chapter we investigate how to provide (ETL+Q) automated scalabil- ity and data freshness in data-warehouses. Moreover we provide an overview of the solutions proposed in the thesis.
The main proposal is a (ETL+Q) auto-scale framework, named AScale. Ac- cording to the approach, the developer designs a logical view of the ETL+Q and data-warehouse (single server), writing only the transformations, with- out worrying about scalability details. Additionally, the developer specifies some parameters needed by the auto-scaling mechanisms.
To guarantee total scalability and freshness, AScale has to deal with both ETL scalability, processing scalability and data freshness guarantees. Section 3.1 describes how ETL+Q scalability is added to a data-warehouse. Section 3.2 overviews the framework of AScale. Section 3.3 describes the proposed scalability mechanisms. Sections 3.4 describes the approaches for data freshness and how to scale for total data freshness. Section 3.5 de- scribes how the system provides high-rate continuous results and scalability when processing continuous results.
Figure 3.1: Single server phases
3.1
Adding ETL+Q scalability to a data-warehouse
design
When building a data-warehouse, the main phases are shown in Figure 3.1. The ”1st design phase” represents the parts to be accounted for before any other system development happens. The related items include:
• ”Data sources” - includes the configuration of the data sources origin and destination, data extraction format and frequency;
• ”data-warehouse schema(s)” - the schema must be defined according to the data and queries to be performed. Generally star-schema models are used for data-warehouses;
• ”Queries” - represent relevant processing load of the system. Queries are also directly linked to the data-warehouse schema. If the schema changes, the queries must be rewritten accordingly.
The ”2nd design phase” regards the integration, implementation and configuration. This phase a↵ects the global performance (e.g. queries exe- cution time, transformation time, integration time, and so on). The related items include:
• ”Extraction” - when extracting data from sources, a set of parameters must be specified, such as: extraction frequency, maximum window extraction size;
• ”Transformation” - after extracting the data from sources, it needs to be transformed and cleaned before loading into the data-warehouse. Transformation operations need to be defined and implemented. For processing the transformation, the system also needs to know a set of parameters, among them the input and output data formats;
• ”Load” - is the process of loading the data into the data-warehouse. This process is crucial and involves the definition of loading scripts, loading periods and duration, load mechanisms, load bu↵er size; • ”Configurations” - includes all adjustments to the system to optimize
the available resources depending on the desired objectives. For in- stance, one needs to configure extraction frequency, data-warehouse integration methods, bu↵er sizes and so on.
The ”3rd design phase”, concerns the scalability of the system or parts of it (e.g. extraction, transformation, loading, storage nodes). When using the proposed AScale framework, the referred ”3rd design phase”, scalability, is automatically included and managed. Thus the user only needs to define how the logical system works for a single server architecture (1st and 2nd design phases), without concerns regarding scalability.
Figure 3.2: AScale architecture for automatic scalability