Project Acronym: Optique
Project Title: Scalable End-user Access to Big Data Instrument: Integrated Project
Scheme: Information & Communication Technologies
Deliverable D7.1
Techniques for Distributed Query Planning and Execution:
One-Time Queries
Due date of deliverable: (T0+12)
Actual submission date: October 31, 2013
Start date of the project: 1st November 2012 Duration: 48 months Lead contractor for this deliverable: UoA
Dissemination level: PU – Public
Executive Summary:
Techniques for Distributed Query Planning and Execution: One-Time
Queries
This document summarises deliverable D7.1 of project FP7-318338 (Optique), an Integrated Project sup-ported by the 7th Framework Programme of the EC. Full information on this project, including the contents of this deliverable, is available online athttp://www.optique-project.eu/.
We first introduce the concept of elasticity and the main contributions of this work. We continue by presenting the ADP system on which we build in order to create the distributed query execution module of Optique. After that, we present the main research results related to the elastic execution of dataflow graphs in a cloud environment. We explore the trade-offs between query completion time and monetary cost of resource usage and we provide an efficient methodology to find good solutions. Next, we present the current status of integration with the Optique platform and the communication with other components of the system. Finally, we conclude by presenting the work that will be carried out during the second year of the project.
List of Authors
Herald Kllapi (UoA) Dimitris Bilidas (UoA) Yannis Ioannidis (UoA) Manolis Koubarakis (UoA)
1 Introduction 5
2 The ADP System 6
2.1 Related Work . . . 6
2.2 Overview . . . 7
2.3 Language Abstractions . . . 7
2.3.1 Dataflow Language . . . 8
2.3.2 Dataflow Language Semantics . . . 10
2.4 Extensions . . . 10
2.5 SQL Processing Engine . . . 11
2.6 Conclusions . . . 11
3 Elastic Dataflow Processing in the Cloud 12 3.1 Related Work . . . 13
3.2 Preliminaries . . . 14
3.2.1 Dataflow . . . 14
3.2.2 Schedule Estimation . . . 14
3.2.3 Skyline Distance Set . . . 16
3.3 Dataflow Language . . . 17 3.4 Operator Ranking . . . 17 3.5 Scheduling Algorithm . . . 19 3.5.1 Dynamic Programming . . . 19 3.5.2 Simulated Annealing . . . 20 3.5.3 Parallel Wave . . . 20 3.5.4 Exhaustive . . . 20 3.6 Complexity Analysis . . . 22 3.7 Experimental Evaluation . . . 22
3.7.1 Model and Algorithms . . . 22
3.7.2 Systems . . . 27
3.7.3 Conclusions of Experiments . . . 32
3.8 Conclusions and Future Work . . . 33
4 Integration of ADP With the Optique Platform 34 4.1 Progress . . . 34
4.2 JDBC Interface . . . 34
4.3 Queries Provided by the Use Cases . . . 36
5 Conclusions 38
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries
Glossary 42
A Test Queries from the Use Cases 43
A.1 NPD Queries . . . 43 A.2 Siemens Queries . . . 47
Introduction
The workpackage WP7 of Optique deals with the distributed query execution module and is divided in the following tasks:
• Task 7.1: Query planning and execution techniques for one-time OBDA queries
• Task 7.2: Query planning and execution techniques for continuous and temporal OBDA queries
• Task 7.3: Optimization techniques
• Task 7.4: Implementation, testing and evaluation
This deliverable describes work that has been done concerning the first task, which corresponds to the effort put in WP7 during the first year of the project. In the context of an Ontology-based Data Access system, the efficient execution of queries produced by the query transformation components heavily relies on the proper and effective use of the resources of a cloud environment. Our main attention has been drawn to this concept, the elastic execution of queries by dynamic resource management, in order to achieve the objectives of this task.
In Chapter 2, we present ADP, a system for distributed query processing in cloud environments. ADP has been developed at the University of Athens during several European funded research projects. ADP is a system designed for the effcient execution of data-intensive flows on the cloud. These dataflows are expressed as relational queries with user defined functions (UDFs). We present the architecture of the system with its main components and the query languages that it supports.
In Chapter 3, we present the research contributions regarding the elastic execution of dataflows. Finding trade-offs between completion time and monetary cost is essential in a cloud environment. Cloud enabled data processing platforms should offer the ability to select the best trade-off. The obvious questions are: 1. Does it exist? 2. Can it be obtained at an overhead that makes it worth it? Our first contribution in this chapter is to demonstrate that very significant elasticity exists in a number of common tasks, even when the abstraction for the cloud-computation is modeled at a very high level, such as MapReduce. Moreover, we show that elasticity can be discovered in practice using highly scalable and efficient algorithms and that there appear to be certain simple “rules of thumb” for when elasticity is present. It is natural to expect that more refined models of the cloud-computation would allow further optimizations and extraction of elasticity. At the same time, it is reasonable to be concerned whether the resulting complexity of the refined model would allow for these optimizations/extraction to be performed. Our second contribution is to demonstrate that there exists a very fertile middle-ground in terms of abstraction which enables the extraction of much more elasticity than what is possible under MapReduce while remaining computationally tractable. The content of this chapter has been submitted for publication and is currently under review.
In Chapter 4, we describe the current status of integration of ADP into the Optique platform. We present several examples of using the JDBC driver and executing the queries of the Optique use cases.
Chapter 2
The ADP System
ADP is a system designed for the efficient execution of complex dataflows on the Cloud [50]. Requests take the form of queries in SQL with user defined functions (UDFs). The SQL query is transformed into two intermediate levels before execution. The query of the first level is again SQL but has additional notations about the distribution of the tables. We enhance SQL by adding thetable partitionas a first class citizen of the language. A table partition is defined as a set of tuples having a particular property that we can exploit in query optimization. An example is the value of a hash function applied on one column to be the same for all the tuples in the same partition. This property is used for distributed hash joins.
ADP acts as a middleware placed between the infrastructure and the other components of the system, simplifying their “view” of the underlying infrastructure. One can benefit from ADP by detaching part of the application logic and writing it using the ADP abstractions. This allows it to scale transparently as needed. The inception of ADP began in 2004 in the European project Diligent1, and in particular, in the dis-tributed query engine deployed for the project. The query engine of Diligent was subsequently used and extended in the European project Health-e-Child2. Finally, after Health-e-Child ended, ADP became an internal project of the MaDgIK3 group.
2.1
Related Work
The most popular platforms for data processing flows on the cloud are based on MapReduce [10], presented by Google. On top of MapReduce, Google has build systems like FlumeJava [8], Sawzall [41], and Tenzing [9]. FlumeJava is a library used to write data pipelines that are transformed into MapReduce jobs. Sawzall is a scripting language that can express simple data processing over huge datasets. Tenzing [9] is an analytical query engine that uses pool of pre-allocated machines to minimize latency.
The main open source implementation of MapReduce is Hadoop [5] by Yahoo!. Hive [48] is a data
warehouse solution from Facebook. HiveQL (the query language of Hive) is a subset of SQL and the
optimization techniques are limited only to simple transformation rules. The optimization goal is to minimize the number of MapReduce jobs, while at the same time maximize the parallelism, and as a consequence, minimize the execution time of the query. HadoopDB [2] is recent hybrid system that combines MapReduce with databases. It uses multiple single node databases and rely on Hadoop to schedule the jobs to each database. The optimization goal is to create as much parallelism as possible by assigning sub-queries to the singe node databases. The U.S. startup Hadapt [20] is currently commercializing HadoopDB. Finally, Amazon offers Hadoop Elastic MapReduce as a service to its customers [3].
Several high-level query languages and applications have been developed on top of Hadoop, such as PigLatin[36] and Mahout [4], a platform for large-scale machine learning. The dataflow graphs used in
1http://diligent.ercim.eu/ 2
http://www.health-e-child.org
MapReduce are relatively restricted and this reduces opportunities for optimization. All of the above systems have as optimization goal to execute the queries as fast as possible.
The Condor/DAGMan/Stork [33] set is the state-of-the-art technology of High Performance Computing. Nevertheless, Condor was designed to harvest CPU cycles on idle machines. However, running data intensive workflows with DAGMan is very inefficient [43]. Many systems use DAGMan as middleware, like Pegasus [11] and GridDB [34]. Proposals for extensions of Condor to deal with data intensive scientific workflows do exist [43], but to the best of our knowledge, they have not been materialized yet. In [14] is presented a case study of executing the Montage dataflow on the cloud examining the trade-offs of different dataflow execution modes and provisioning plans for cloud resources.
Dryad [24] is a commercial middleware by Microsoft that has a more general architecture than MapReduce since it can parallelize any dataflow. Its schedule optimization, however, relies heavily on hints requiring knowledge of node proximity, which are generally not available in a cloud environment. It also deals with job migration by instantiating another copy of a job and not by moving the job to another machine. This might be acceptable when optimizing solely time but not when the financial cost of allocating additional containers matters. DryadLINQ [53] is built on top of Dryad and use LINQ [49], a set of .NET constructs for manipulating data. LINQ queries are transformed into Dryad graphs and executed in a distributed fashion. Stubby [21, 32] is a cost based optimizer for Map-Reduce dataflows. Our work have some similarities with the modeling but we target a broader range of dataflow graphs.
Mariposa [47] was one of the first distributed database systems that takes into consideration the monetary cost of answering the queries. The user provides a budget function and the system optimizes the cost of accessing the individual databases using auctioning.
Dremel [35] is a system for real time analysis of very large datasets. The system is designed for a subclass of SQL queries that return relatively small results. The optimization techniques in this work target a more broad class of SQL queries.
2.2
Overview
Figure 2.1 shows the current architecture of ADP. The queries are optimized and transformed into execution plans that are executed in ART, the ADP Run Time. The resources needed to execute the queries (machines, network, etc.) are reserved or allocated by ARM, the ADP Resource Mediator. Those resources are wrapped into containers. Containers are used to abstract from the details of a physical machine in a cluster or a virtual machine in a cloud. The information about the operators and the state of the system is stored in the Registry. ADP uses state of the art technology and well proven solutions inspired by years of research in parallel and distributed databases (e.g., parallelism, partitioning, and various optimizations).
The core query evaluation engine of ADP is built on top of SQLite4. The system allows rapid development of specialized data analysis tasks that are directly integrated into the system. Queries are expressed in SQL extended with UDFs. Although UDFs are supported by database management systems (DBMS) for a long time, their use is limited due to their complexity and limitations imposed by the DBMSs. One of the goals of the system is to eliminate the effort of creating and using UDFs by making them a first class citizens in the query language itself. SQLite natively supports UDFs implemented in C. The UDFs are categorized into row, aggregate, and virtual table functions.
2.3
Language Abstractions
The query language of ADP is based on SQL enhanced with two extensions. The first one is the automatic creation of virtual tables. This permits direct usage within the query, without explicitly creating them before. The second extension is an inverted syntax which uses UDFs as statements. We present some examples to illustrate. The following query reads the data from ‘in.tsv’ file.
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries
Figure 2.1: The system architecture
create virtual tableinput_fileusingfile(’in.tsv’); select * from input_file;
The query can be written as: select * from file(’in.tsv’);
and the sytem will automatically translate it into the above syntax. The second extension is an inverted syntax which uses UDFs as statements. Using this syntax, the query can be further simplified as:
file ’in.tsv’;
The inverted syntax provides a natural way to composite virtual table functions. In the following example, the query that uses the file operator, is provided as a parameter to countrows:
select * from countrows("select * from file(’in.tsv’)");
The above syntax is very error prone because it uses nested quote levels. By using inversion, the query is written as:
countrows file ’in.tsv’;
The ordering is from left to right, i.e.,xyz is translated to x(y(z)). Notice that this syntax is very close to the natural language sentence “count the rows of file ‘in.tsv’ ”.
2.3.1 Dataflow Language
This language is internal to the system, but it can also be used to bypass the optimizer and enforce a specific dataflow. The best way to describe the syntax and semantics of the language is by presenting some examples. We use a subset of the TPC-H schema described below:
1. orders(o_orderkey, o_orderstatus, ...)
2. lineitem(l_orderkey, l_partkey, l_quantity, ...) 3. part(p_partkey, p_name, ...)
Assume that tables are horizontally partitioned as follows. 1. orders to 2 parts onhash(o_orderkey)
2. lineitem to 3 parts onhash(l_orderkey) 3. part to 2 parts on hash(p_partkey)
distributed create table lineitem_large as select * from lineitem where l_quantity > 20;
withhash(∗) being a known hash function with good properties (e.g. MD5). The queries have two seman-tically different parts: distributed and local. The distributed part defines how the input table partitions are combined and how the output will be partitioned. The local part defines the SQL query that will be evaluated against all the combinations of the input (possibly, in parallel). In the example, the system will run the SQL query on every partition of table lineitem. Consequently, table lineitem_large will be created and partitioned into the same number of partitions as lineitem. The output table can be partitioned based on specific columns. All the records with the same values on the specified columns will be on the same partition. For example, consider the following query.
distributed create table lineitem_large on l_orderkey as select * from lineitem where l_quantity > 20;
The output of each query, is partitioned on column l_orderkey. Notice that all records with the same l_orderkey value must be unioned in order to produce the partitions of table lineitem_large, creating the lattice after the queries are executed. The user can specify the number of partitions (e.g., 10) by writing the query as follows:
distributed create table lineitem_large to 10 on l_orderkey as select * from lineitem where l_quantity > 20;
For the time being, if the degree of parallelism is not specified, it is set to a predefined value. An interesting optimization problem is to find the optimal parallelism to use. We are currently working on this problem. We want to stress the fact that even when this feature will be available, the current functionality will be given as an option because it is extremely useful in practice.
If more than one input tables are used, the table partitions are combined and the query is executed on each combination. The combination is either direct or a cartesian product, with the latter being the default behavior. An example is the following query.
distributed create table lineitem_part as
select * from lineitem, part where l_partkey = p_partkey;
The system evaluates the query by combining all the partitions of lineitem with all the partitions of part. As a result, table lineitem_part will have 6 partitions (3 x 2). If tables lineitem and part have the same number of partitions, the combination can be a direct product. This is shown in the following query. distributed create tablelineitem_partas direct
select * fromlineitem, partwherel_partkey = p_partkey;
Notice that, in order the query to be a correct join, the tables lineitem and part must be partitioned on columns l_partkey and p_partkey respectively. The local part of the query can be as complex as needed using the full expressivity of SQL enhanced with the UDFs. Queries can be combined in order to express complex data flows. For example, a distributed hash join can be expressed as follows:
distributed create temporary table lineitem_p on l_partkey as select * from lineitem;
distributed create temporary table part_p on p_partkey as select * from part;
distributed create table lineitem_part as direct
select * from lineitem_p, part_p where l_partkey = p_partkey;
Tables lineitem_p and part_p are temporary and are destroyed after execution. Notice that in this example, the system must choose the same parallelism for tables lineitem_p and part_p in order to combined as a direct product. A MapReduce flow can be expressed as follows:
distributed create temporary table map on key
select keyFunc(c1, c2, ...) as key, valueFunc(c1, c2, ...) as value from input;
distributed create table reduce as
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries
with, key(∗), being a row function that returns the key of the row, andvalue(∗) being a row function that produces the value. In the second query, thereduce(∗)is a aggregate function that is applied on each group. The system also support indexes. The index is not a global index. Instead, is created on each partition of the table on the specified columns. An example is shown below:
distributed create index l_index on lineitem(l_partkey);
Another useful feature of the language is the ability to specify the partitions against which the query will be evaluated. For example, if only the first partition of table lineitem has all the records with l_quantity more than 20, we can write the following query:
distributed on [0] create table lineitem_large as select * from lineitem where l_quantity > 20;
2.3.2 Dataflow Language Semantics
A dTable consists of its schema, the list of its partitions, and the partition columns, i.e., dTable(schema, parts, pcols) with |parts| >= 1 and |pcols| >= 0. A partition of a dTable is a relational table with the same schema. The partitions of the tables are horizontal. A partitioning is a transformation of the dTable into a new dTable with the specified number of partitions on the specified columns, i.e., output=
p(input, parts, columns), with parts ≥ 1 and |columns| ≥ 1. A dQuery is a transformation of its input dTables to a single output dTable, i.e.,output =query(SQL+U DF s, inputs, combine) with|inputs| ≥1 andcombine={cartesianproduct, directproduct}. The following holds for cartesian product:
• |output.parts|=Q|inputs|−1
i=0 |inputs[i].parts|
• output.parts[i] =query(input[j].parts[k]),∀i, j, k
The following holds for direct product:
• |inputs[i].parts|=S,∀i, S >0,
• |output.parts|=S
• output.parts[i] =query(input[j].parts[i]),∀i, j
Ap(dQuery)is a dQuery with partitioning, i.e., output = p(dQuery(def, inputs, combine), parts, columns). AdQueryScriptis set of p(dQuery) connected with tables. By definition, the output of the last p(dQuery) is the result of the script.
2.4
Extensions
The system uses extensively the UDF extensions APIs of SQLite. SQLite supports the following UDF categories: Row functions take as input one or more columns from a row and produce one value. An example is the U P P ER() function. Aggregate functions can be used to capture arbitrary aggregation functionality beyond the one predefined in SQL (i.e.,SU M(),AV G(), etc.). Virtual table functions (also known as table functions in Postgresql and Oracle) are used to create virtual tables that can be used in a similar way with tables. The API offers both serial access via a cursor, and random access via an index. The SQLite engine is not aware of the table size allowing the input and output to be arbitrarily large. All UDFs are implemented in Python. Both Python and SQLite are not strictly typed. This enables the implementation of UDFs that have dynamic schemas based on their input data.
2.5
SQL Processing Engine
The first layer is APSW, a wrapper of SQLite that makes possible the control of the DB engine from Python. APSW also makes possible to implement UDFs in Python enabling SQLite to use them in the same way as its native UDFs. Both Python and SQLite are executed in the same process, greatly reducing the communication cost between them. The Connection and Function Manager (CFM) is the external interface. It receives SQL queries, transforms them into SQL92, and passes them to SQLite for execution. This component also supports query execution tracing and monitoring. Finally, it automatically finds and loads all the available UDFs.
The query is parsed, optimized, and transformed into the intermediate dataflow language described earlier. The Parser and Optimizer of the system, use information stored in the Catalog. The catalog contains all the information about the tables. The dataflow language is optimized and transformed into a dataflow graph. Each node of the graph is an SQL query and each link is either an input or an output table. The graph produced contains two types of operators: SQLExec and unionReplicator. Operator SQLExec takes as a parameter an SQL query and executes it reading the partitions from its input and producing the output partitions. Operator unionReplicator, performs a union all to the partitions of the input (all the partitions must be from the same dTable) and replicates the result to all of its outputs.
The dataflow graph is given to ADP for execution. The system schedules the dataflow graph and monitors its progress. When the execution completes successfully, the table produced is added to the Catalog of the system. ADP finds the best schedule for the dataflow graph and executes it on the containers reserved on the cloud.
A particular database is located in a particular catalog on the file system of each machine. Each partition is in a different file. A table is attached to the database without needing to importing the data, and as a result, eliminating the startup and shutdown time of the database engine. The limit on the number of attached databases in Sqlite is 62. For operators that have more than 62 input partitions, we create a tree of unions. The total number of iterations is bounded by ceil(log62|inputs|) - 1 .
2.6
Conclusions
In this chapter we described the architecture and the components of ADP. We also presented example queries that specify the use of UDFs in the language and we gave a detailed characterization of the dataflow language. In the next chapter we will present results regarding the optimization of dataflow processing, which also contains the task of optimizing execution of queries expressed in the specified language in a system like ADP.
Chapter 3
Elastic Dataflow Processing in the Cloud
Query processing has been studied for a long time by the database community in the context of parallel, distributed, and federated databases [15, 28, 37]. Recently, cloud computing has attracted much attention in the research community and software industry and fundamental database problems are being revisited [17]. Thanks to virtualization, cloud computing has evolved over the years from a paradigm of basic IT infras-tructures used for a specific purpose (clusters), to grid computing, and recently, to several paradigms of resource provisioning services: depending on the particular needs, infrastructures (IaaS — Infrastructure as a Service), platforms (PaaS — Platform as a Service), and software (SaaS — Software as a Service) can be provided as services [18]. One of the important advantages of these incarnations of cloud computing is the cost model of resources. Clusters represent a fixed capital investment made up-front and a relatively small operational cost paid over time. In contrast, clouds are characterized by elasticity, and offer their users the ability to lease resources only for as long as needed, based on a per quantum pricing scheme, e.g., one hour on Amazon EC2.1 Together with the lack of any up-front cost, this represents a major benefit of clouds over earlier approaches. The elasticity, i.e., the ability to use computational resources that are available on demand, challenges the way we implement algorithms, systems, and applications. Execution of dataflows can be elastic, providing several choices of price-to-performance ratio and making the optimization problem two dimensional [27].
Modern applications combine the notions of querying & search, information filtering & retrieval, data transformation & analysis, and other data manipulations. Such rich tasks are typically expressed in a high level language (SQL enhanced with UDFs), optimized [29], and transformed into an execution plan. The latter is represented as a data processing graph that has arbitrary data operators as nodes and producer-consumer interactions as edges. In this chapter we focus on the common denominator of all distributed data processing systems: scheduling, i.e., where each node of the dataflow graph will be executed. Scheduling is a well-known NP-complete problem [19, 42]. Traditionally, the main criterion to optimize is the completion time of the dataflow, and many algorithms have been proposed for that problem [30, 52]. Scheduling dataflows on the cloud is a challenging task since it has a very rich space of alternative schedules taking into account the monetary cost of using the resources.
A major research problem is the development of new distributed computing paradigms that fit closely the elastic computation model of cloud computing. The most successful of these computational models today is MapReduce. Our first contribution is to demonstrate that very significant elasticity exists in tasks modeled with the MapReduce abstraction. Moreover, we show that elasticity can be discovered in practice using highly scalable and efficient algorithms.
It is natural to expect that more refined models would allow further optimizations and extraction of elasticity. At the same time, it is also reasonable to be concerned as to whether the resulting complexity of the refined model would allow these optimizations to be performed. Our second contribution is to demonstrate that there exists a very fertile middle-ground in terms of abstraction which enables the extraction of much more elasticity than what is possible under MapReduce while remaining computationally tractable.
In this work, we propose a two step approach to explore the space of alternative schedules on the Cloud with respect to both completion time and monetary cost. Initially, we compute a global ranking of the operators of the dataflow based on their influence. Given the ranking, a dynamic programming algorithm finds the initial skyline of solutions. This skyline is further refined by a 2D simulated annealing algorithm. To illustrate the intuition behind our approach, we use the example of dataflow graph in Fig. 3.1.
B C A
d e
Figure 3.1: A simple dataflow. Heavy operators are bigger. Thicker arrow means large volume of data. Given that we can measure the influence of each operator in the graph, we can assign the most influential operators first. In our example the best solution is: (i) put in different containers A, B, & C, (ii) put together e and C, and (iii) put together d and A. The challenge is how to measure the influence of each operator in the dataflow. The number, nature, and temporal & monetary costs of the schedules on the skyline depend on many parameters, such as the dataflow characteristics (execution time of operators, amount of data generated, etc.), the cloud pricing scheme (quantum length and price), the network bandwidth, and more. We incorporated these scheduling algorithms to ADP.
To the best of our knowledge, our system is the first attempt to address the problem of dataflow processing on the cloud with respect to both completion time and monetary cost. Our techniques can successfully find trade-offs between time and money, and offer to the user the choice of choosing the best trade-off or choose automatically the appropriate one based on the user preferences and constraints.
In this chapter, we make the following contributions:
• We show that elasticity can be embedded into processing systems that use the MapReduce abstraction.
• We propose a model of jobs and resources that can capture the special characteristics of the Cloud.
• We propose a two step solution to the scheduling problem on the cloud. We compute the ranking of the operators in the dataflow. Given that, a dynamic programming algorithm finds the initial skyline of solutions. The skyline is then refined by a 2D simulated annealing algorithm.
• We show that our approach is able to successfully find trade-offs between completion time and monetary cost, and thus, exploit the elasticity of clouds.
• We show that using our model, we achieve manageable complexity and a significant gain producing schedules that dominate plans produced by the MapReduce abstraction on every dimension.
The remainder of this chapter is organized as follows. In Section 3.1 we present the related work. In Section 3.2 we introduce the notations we use and the problem definition. In Section 3.3 we describe the dataflow language abstractions we use. In Section 3.4 we present the ranking algorithms that we used. In Section 3.5 we present the algorithms that we propose and in Section 3.6 we compute their complexity. In Section 3.7 we present our experimental effort and in Section 3.8 we conclude.
3.1
Related Work
Dataflow processing presents a major challenge and a lot of attention has been given in the recent years. In [45] is presented a methodology to design ETL dataflows based on multiple criteria. This is complementary to our work. However, to the best of our knowledge, the optimization is not automatic for the time being. New approaches [46], focuses on optimizing execution time of data flows executed over multiple execution engines (DBMS, MapReduce, etc.).
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries
There are also several efforts that move in the same direction as our work but try to solve simpler versions of the problem. Examples include a particle swarm optimization of general dataflows having a single-dimensional weighted average parameter of several metrics as the optimization criterion [38], a heuristic optimization of independent tasks (no dependencies) having the number of machines that should be allocated to maximize speedup as the optimization criterion given a predefined budget [44], and focusing on energy efficiency [51].
In summary, we capitalize on the elasticity of clouds and produce multiple schedules, enabling the user to select the desired trade-off. To the best of our knowledge, no dataflow processing system deals with the concept of elasticity or two-dimensional time/space optimization, which constitute our key novelties.
3.2
Preliminaries
3.2.1 DataflowWe use the same modeling as in [27] and for completeness, we summarize here. A dataflow is
repre-sented as a directed acyclic graph graph(ops, f lows). Nodes (ops) correspond to arbitrary concrete opera-tors and edges (f lows) correspond to data transferred between them. An operator in ops is modeled as
op(time, cpu, memory, behavior)wheretimeis the execution time of the operator &cpuis its average CPU utilization measured as a percentage of the host CPU power when executed in isolation (without the presence of other operators),memoryis the maximum memory required for the effective execution of the operator, and
behavioris a flag that is equal to eitherpipeline(PL) orstore-and-forward (SnF). If behavior is equal to SnF, all inputs to the operator must be available before execution; if it is equal to PL, execution can start as soon as some input is available. Two typical examples from databases aresort and select operators: sort is SnF andselectis PL. These metrics are either computed or collected by the system [31]. We model an operator as having a uniform resource consumption during its execution (cpu,memory, andbehavior do not change). A
flowbetween two operators, producer and consumer, is modeled asf low(producer, consumer, data), where
datais the size of the data transferred.
The container is the abstraction of the host, virtual or physical, encapsulating the resources provided by the underlying infrastructure. Containers are responsible for supervising operators and providing the necessary context for executing them. A container is described by its CPU, its available memory, and its network bandwidth: cont(cpu, memory, network).
AscheduleSGof a dataflow graphGis an assignment of its operators into containersschedule(assigns). An individual operatorassignment is modeled as: assign(op, cont, start, end) wherestartand endare the start and end time of the operator correspondingly, executed in the presence of other operators.
Time t(SG) and money m(SG) costs are the completion time and the monetary cost of a scheduleSG of a dataflow graphG. Cloud providers lease computing resources that are typically charged based on a per time quantum pricing scheme. For this reason we measuret(SG)and m(SG)in quanta. Qtand costQm are thequantum timeand price of leasing a container for Qt respectively.
The cloud is a provider of virtual hosts (containers). We model only the compute service of the cloud and not the storage service. Assuming that the storage service is not used to store temporary results, a particular dataflow G will read and write the same amount of data for any schedule SG of operators into containers. So the cost of reading the input and writing the output is the same.
3.2.2 Schedule Estimation
The algorithm estimates the time that every operator starts and its execution time given an assignment of operators to containers. We use a geometric approach. The time-shared resources are cpu, network, and disk. Those resources form boxes in a multi-dimensional space. Operators are boxes with resources and limited time. Containers are also modeled as boxes with infinite time. Intuitively, the problem becomes how to fit the boxes of the operators into the boxes of containers.
Operators are modeled with three boxes I, P, and O with dimensions (time, cpu, inRate, outRate) as follows: I are the resources needed to read the input from the network or from the disk, P are resources needed to process the data, and O are resources needed to write the output to the network or to the disk. For SnF operators I and O are only disk resources. For data transfer operators, P is always zero. The data read from the network is always written to the disk. Notice that even for PL operators, this modeling is acceptable because at some level, PL operators are a sequence of SnF operators that read a small fraction of the input, process it, and produce a small fraction of the output.
Definitions
Formally, let A be an operator that belongs to dataflow G. The operator is defined as A(time, cpu,−,−) withassign(A, X,−,−). Without loss of generality, assume thatA is a LP operator. For SnF operators we use the disk instead of the network for I/O. The total data that Areads from the network is:
D∗→A= X
B∈G
{D:f low(B, A, D), assign(B, Y,−,−), X6=Y}
Similarly, the total data thatA writes to the network is:
DA→∗=
X
B∈G
{D:f low(A, B, D), assign(B, Y,−,−), X6=Y}
The three boxes of the operator are defined as follows:
I( D∗→A
X.network, DTCP U, X.network,0) P(A.time, A.cpu,0,0)
O( DA→∗
X.network, DTCP U,0, X.network)
with DTCP U being a system parameter. In isolation, the box AB of operator A will have the following properties:
AB.time=I.time+P.time+O.time
AB.cpu=
I.cpu·I.time+P.cpu·P.time+O.cpu·O.time AB.time AB.inRate= I.inRate·I.time AB.time AAB.outRate= O.outRate·O.time AB.time
In the context of others, operators are scaled in the time dimension due to time-shared resources. However, assuming uniform behavior for all operators, the following measures do not change at any scale.
instructions=AB.time·AB.cpu
indata=AB.time·AB.inRate
outdata=AB.time·AB.outRate
Those measures are used to calculate the cpu, inRate and outRate at any time scale. A group S is a subset of connected operators that can be executed concurrently. Groups can have either connected PL operators or only one SnF operator. The execution time of the operators belonging to the same group is the same. As a consequence, that all operators inside the same group are scaled to reach the most time consuming operator. Formally, a groupP is defined asP(−→o ,−→s)with−→o being the vector with the operators that are members of the group and−→s being the vector with the time scale of operators inside the group.
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries Money Time d1 d2 d3
Figure 3.2: Two skylines A (circle) and B (square). Their distance is(d1+2d2, d3).
Estimation Algorithm
The estimation algorithm estimates when and for how long every operator will run given any schedule. Thus, the completion time and monetary cost is estimated for the whole schedule. The graph is examined from producers to consumers starting from the operators that have no inputs. The set of operators that can be executed in parallel is divided into groups. Given a vector of groups −→P, we find the vector of −→S with the time scales. An operatoroi that belong to groupPj is scaled by Sj·P.si. The next time eventt is defined by the group that will terminate first. The remaining time and in/out data for each operator is reduced by the percentage of the time that the operator is executed till t. The terminated operators are removed from the ready set and the operators from the queued set that their memory constraint are satisfied, are inserted in the ready set. The algorithm terminated when all the operators are examined.
Thecompletion time t(SG) is defined as the time of the termination of the last operator of the schedule. To compute themoney m(SG), we slice time in each container into windows of lengthQt starting from the first operator of the schedule. The monetary cost is then a simple count of the time-windows that have at least one operator running, multiplied byQm.
m(SG) =Qm∗( |C| X i=1 |W| X j=1 (ci, wj))
withC={ci}being the set of containers, W ={wj} being the set of time-windows, and
(ci, wj) =
1, if at least one operator is active in wj inci
0, otherwise
3.2.3 Skyline Distance Set
To compare solutions from different algorithms we define the distance between two skylines. LetA and B
be two skylines produced by different algorithms. The units of both time and money are in quanta. Assume that A is dominated by B, i.e., skyline(A∪B) = B. Let |A| be the number of points in skyline A. The distance of Afrom B is defined as:
Dp(A, B) = P|A|
i=1dist(Ai, B)
|A|
withdist(Ai, B) being the distance of schedule Ai ∈A from skylineB. Intuitively, this distance shows the average distance in quanta between the schedules of the two skylines. Several works has studied the problem of finding the distance between a point and a skyline [26, 22]. In general, the problem is to minimize the total cost of makingAi part ofB, i.e., make it a point in the skyline. In this work we define this distance as being the minimumL2 distance needed forAi to be part of B. We can compute it as shown in Figure 3.2. This distance is smooth at the point where Ai becomes part of the skyline.
We define the distance between two arbitrary skylines as follows. Let A and B be two skylines. We computeC =skyline(A∪B). The distanceD(A, B)is defined as the pair(Dp(A, C), Dp(B, C)). Figure 3.2 shows an example. This distance has several good properties. Let (Dp(A, C), Dp(B, C)) = (a, b). The following hold:
(i)a, b≥0
(ii) ifa= 0 thenA dominatesB
(iii) ifa, b >0, both Aand B have schedules with tradeoffs (iv) is smooth where a schedule becomes part of the skyline
Figure 3.2 shows an example where bothaandbare positive. This distance can be generalized tonskylines (S1, ..., Sn) as follows:
D(S1, ..., Sn) = (Dp(Si, C), ..., Dp(Sn, C))
withC=skyline(∪(Si)). In our experiments, we compute the generalized skyline distance from the results of all algorithms to compare the results.
3.3
Dataflow Language
In our system, requests take the form of queries in SQL with user defined functions (UDFs). The SQL query is transformed into the intermediate level that we described in Section 2.3.1. This intermediate level SQL script is then transformed to a dataflow graph using the modeling that described in Section 3.2. Figure 3.3 shows the dataflow graph produced from query 8 of the TPC-H benchmark.
8 1 6 4 5 4 6 4 7 4 8 4 9 5 0 5 1 5 2 9 1 0 1 1 1 2 1 3 1 4 1 5 1 8 1 9 2 0 2 9 3 0 3 1 3 2 3 3 3 4 3 5 3 6 9 9 1 0 0 1 0 1 1 0 2 1 0 3 1 0 4 1 0 5 1 0 6 1 0 8 6 1 6 3 6 4 7 3 7 4 7 5 7 6 7 7 7 8 7 9 8 0 8 1 9 0 1 1 0 9 1 9 2 9 3 9 4 9 5 9 6 9 7
Figure 3.3: The dataflow graph produced from TPC-H query 8.
3.4
Operator Ranking
In this section we present the methodology we use to rank the operators of a dataflow graph. A score is computed for each operator according to the influence they have on the schedule. Figure 3.4 shows the results of ranking the Montage dataflow with 50 SnF operators. Influential operators have darker colors.
0 10 20 30 40 50 60 70 0 5 10 15 20 25 30 35 40 45 50
Figure 3.4: The Montage dataflow ranking information. Darker colors means higher scores.
A simple way to rank the operators is by computing a relative score based on their properties and the input & output data F(op, in, out). We call this ranking Structure Ranking because it takes into account only the immediate neighborhood for each operator in the graph. The scoring function we use is defined as follows:
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries
withoi the operator i,proc_time=oi.timeand
io_time= P|in| i=1(in[i].data) + P|out| i=1(out[i].data) net_speed
Parameter a expresses the importance of the execution time and I/O time. In our experiments we used
a= 0.5. The ranking of operators is defined as
SR(G) = (score(o1), score(o2), ..., score(on))
Assuming there are no long-range correlations between operators in the graph, this is a good ranking function. A long-range correlation between two operators exists when the minimum length of the path between them is more than 2, and the assignment of one operator in the schedule affects the assignment of the other. If the graph has this property,Structure Ranking will not find it.
To overcome the problem, we compute the score for each operator by measuring their influence on the schedule directly. We can measure that by finding the partial derivative of the operators on the space of schedules produced by our model. Given a particular schedule SG of a dataflow graphG, we compute:
∇SE(SG) = ( θSE θo1 (SG), ..., θSE θon (SG))
withoi being the operator i in the dataflow andSE being the schedule estimation algorithm. Essentially, the derivative shows how sensitive is the dataflow to the different assignments of the operators. We call this ranking Derivative Ranking. The derivative of SE is hard to compute analytically, so instead we use an iterative process to approximate it. Assuming the graph has n operators, the space of schedules is a
n-dimensional hypercube. Each axis of that cube is of lengthC (the number of containers). To measure the derivative, we assign each operator to all possible positions in its axis, without changing the positions of the others, and measure the difference in time and money of all the produced schedules.
Algorithm 1 shows the ranking process. The algorithm creates a schedule by randomly assigning the operators into containers. Then, each operator is assigned to every possible container without changing the positions of the others. At each step, it measures the difference of the cost function provided. This is repeated until the ordering of the operators does not change. The cost function we used is defined as follows:
F(S(G)) =a∗S(G).time+ (1−a)∗S(G).money
In our experiments we seta= 0.5.
Intuitively, theStructure Ranking and theDerivative Rankingshould produce similar results if there are no long-range correlations between the operators in the dataflow graph. Indeed, this is the case for graphs with SnF operators. Below we present the results of ranking the Ligo [13] dataflow with 50 operators. We show the structure (S) and derivative (D) ranking. Each operator is a different character. Each list is ranked from the highest score to the lowest. We measure the difference of the two rankings using the Kendall tau rank distance [16].
S: W noXmM T piOkRf V N jP QghlU SqY rBAEHKCGJ F LID1c2a3b45Zd6e
D: W M XT ORN P nomkV pf jQiU ShglrqY J DBF GHELKCIA1a42b3Zce6d5
Kendall tau Distance : 0.088
We observe that the two rankings are not significantly different and the most influential operators are the same. However, for PL operators, the long-range correlations do exist. Below we show the results of Ligo with PL operators.
S: W noXmM T piOkRf V N jP QghlU SqY rBAEHKCGJ F LID1c2a3b45Zd6e
D: Y W N T QXJ DRIV CBOLU bGZ4F S3HM EAacP gj1f ndKkhi5m6le2oprq
Kendall tau Distance : 0.475
We observe that the ranking is significantly different. Pipeline operators run in parallel and two connected operators run the same amount of time, regardless if one of them is faster than the other. The structure ranking, would give the fast operator a smaller score than the slow one, which is not correct, because they are both influential. An illustrative example is operator D. Using the structure ranking, its rank is 37 and using derivative is 8.
Algorithm 1Ranking Algorithm Input:
G: The dataflow graph
N: The number of containers
F:F(S(G))→ <: The cost function
M: The maximum number of iterations Output:
scores: The scores of the operators
1: scores[|G.ops|]←0 2: form∈[1, M]do 3: S ←RandomScheduler(G, N) 4: all[|G.ops|][N]←0 5: foro∈Gdo 6: forc∈[1, N]do 7: assign(o, c,_,_) 8: all[o][m]←F(S) 9: end for 10: end for 11: curr[|G.ops|]←0 12: foro∈Gdo
13: curr[o]←maxi(all[o][i])−mini(all[o][i])
14: end for
15: if ranking of ops is the same inscoresandcurrthen
16: break
17: end if
18: foro∈Gdo
19: scores[o]←((m−1)·scores[o] +curr[o])/m
20: end for
21: end for
22: returnscores
3.5
Scheduling Algorithm
The scheduling algorithm that we propose has two phases. The first phase computes the skyline of schedules based on a dynamic programming algorithm using the ranking of the operators. This skyline is further refined using a 2D simulated annealing. In the following sections we present the two algorithms.
3.5.1 Dynamic Programming
The dynamic programming algorithm that we use is shown in Algorithm 2. The operators are considered from producer to consumer. Each operator with no inputs, is a candidate for assignment. An SnF operator is a candidate, as soon as all of its inputs are available. A PL operator is a candidate, as soon as all of its inputs come from PL or from completed SnF operators. The algorithm chooses the operator with the maximum rank from the list of available operators. That operator is added to all the schedules in the skyline at every possible position. At every step, only the schedules in the skyline are kept.
The skyline may contain too many points [40]. For the experiments, we keep all the points in the skyline. In practice this is infeasible. Several approaches can be followed. One approach would be to keep
k representative schedules (kis a system parameter) from the skyline: the fastest, the cheapest, and k−2 equally distributed in between.
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries
Algorithm 2Dynamic Programming Input: G: A dataflow graph.
C: The maximum number of parallel containers to use.
Output: skyline: The solutions.
1: skyline←
2: ready←{operators inGthat have no dependencies} 3: f irstOperator←maxRank(ready)
4: f irstSchedule← {assign(f irstOperator,1,−,−)} 5: skyline←{f irstSchedule}
6: while ready6= do
7: next←maxRank(ready)
8: S←
9: forall schedulessinspacedo
10: forall containers c (c≤C)do
11: S←S∪ {s+assign(next, c,−,−)} 12: end for
13: end for
14: space←skyline ofS
15: ready←ready− {next}
16: ready←ready∪{operators inGthat dependency constraints no longer exist} 17: end while
18: returnskyline
3.5.2 Simulated Annealing
The 2D simulated annealing is shown in Algorithm 3. The initial skyline is computed by Algorithm 2. At each step, all the schedules of the skyline are considered. A neighbor of each schedule is computed by assigning an operator to another container. If the newly produced schedule dominates the old one, only that schedule is kept. Both schedules are kept if they do not dominate each other. If the old one dominates the new, the new one is kept with probability that depends on the euclidean distance between them and the temperature. The lower the temperature, the smaller the probability of keeping a dominated schedule.
AsRandN eighborwe use two functions: (i) purely random and (ii) random based on ranking. The later, chooses the operators with probability proportional to their scores.
3.5.3 Parallel Wave
TheParallel Wave is a generalization of the scheduling algorithm for MapReduce graphs. In the beginning, the algorithm finds the depth of each operator in the graph. The operators with no inputs, have depth zero. The depth of every other operator is the maximum depth of the operators connected to its inputs plus one. In a MapReduce pipeline, all the operators in the map phase will be at the same depth. The same holds for operators in the reduce phase. We used this algorithm to add elasticity into MapReduce dataflow pipelines, similar to the ones produced by Hive [48] or Flume [8].
LetW be the number of different depths in the graph andpi be the maximum parallelism of depthi. pi is defined as min(C,|Wops|), with C being the maximum number of containers and |Wi|being the number of operators at depthi. This algorithm considers all the combinations of different pi. For the scheduling of operators in the same level, we use a simple load balancing mechanism.
3.5.4 Exhaustive
The exhaustive algorithm enumerates all the different schedules and keeps the ones in the skyline. We use this algorithm for two purposes: (i) to compare the results of the proposed algorithms with the optimal and (ii) as a initialization step for simulated annealing to find the optimal assignment for the most influential operators. Let N be the number of operators and C be the number of containers. The space of solutions is
Algorithm 3Simulated Annealing Input: G: A dataflow graph.
K: Maximum number of iterations.
C: Maximum number of containers
Output: skyline: The solutions.
1: skyline←DynamicP rogramming(G, C)
2: S←skyline
3: k←0
4: while k < K do
5: forall schedulessinS do
6: n←RandN eighbor(s)
7: if n.time < s.timeandn.money < s.money then
8: // Dominate 9: s←next
10: else
11: if n.time < s.timeorn.money < s.money then
12: // Tradeoff 13: S←S∪ {next} 14: else 15: if e− L2Distance(s,next) T(k) > rand [0,1] then 16: s←next 17: end if 18: end if 19: end if 20: end for
21: skyline←skyline of(skyline∪S)
22: k←k+ 1
23: end while
24: returnskyline
CN. For N = 2 and C = 2 the assignments are 22. However, the solution space has a lot of symmetries. In the example, only two solutions are different: i) assign both operators to the same container or ii) assign them to different containers. We break the symmetries2 generating schedules as shown in Figure 3.5. The figure shows all different solutions when scheduling three operators (A, B, and C). We can compute the
AB
A A
B A B
C ABC AB C AC B A BC A B C
Figure 3.5: All the solutions for three operators A, B, and C.
number of different schedules as follows. The number of leafs at the sub-tree starting withk containers and
nremaining operators to assign is:
ss(n, k) =k·ss(n−1, k) +ss(n−1, k+ 1)
Withss(0, k) = 1 ∀k. The root of the tree isss(N,0). A simple dynamic programming technique is used to solve the equation. A generalization is to use at mostC containers.
ssg(n, k, C) =k·ssg(n−1, k, C) +ssg(n−1, k+ 1, C)
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries ssg(0, k, C) = 1, ∀k≤C 0, otherwise 1 100 10000 1e+06 1e+08 1e+10 1e+12 1e+14 0 2 4 6 8 10 12 14 16 18 20 # Schedules # Operators Number of Schedules
Figure 3.6: Total and unique number of schedules.
Figure 3.6 shows the number of schedules before and after breaking the symmetries. For 20 operators, the unique number of schedules are approximately 10 orders of magnitudes less. We leave for future work the breaking the symmetries on the graph.
3.6
Complexity Analysis
We assume as given a dataflow graphGwithnoperators,llinks, andccontainers. We also assume that the number of schedules in the skyline is no more than s.
Schedule Estimation: The worst case scenario is all operators to be ready in the beginning and only one operator to terminate each time. At each phase, all operators and all links are considered. The complexity is SEC =O( n X i=1 (i+l)) =O(nl+n(n+ 1) 2 )
Given that in most graphs l > n,SEC =O(nl).
Ranking: Ranking takes at most r iterations to complete. At each iteration, nc invocations of SE are performed. Thus, the complexity is RC =O(rn2lc).
Dynamic Programming: This algorithm takes n steps. At each step at most sc invocations of SE are performed, one for each schedule and container. Therefore, the complexity isDY NC =O(sn2lc).
Simulated Annealing: Simulated annealing takes at most k steps to run and in every step at most s
invocations ofSE are performed for the new neighbor. The complexity isSAC =O(ksnl).
The algorithm, calls the ranking algorithm, then the dynamic programming, and at the end, the simulated annealing. Therefore, the complexity of the algorithm is: O(n2lc(r+s+ks/nc)).
3.7
Experimental Evaluation
This section presents the results of our experimental effort divided into two groups. The first group contains experiments with our model and algorithms. The second group contains experiment with our system and the comparison with Hive [48].
3.7.1 Model and Algorithms
Table 3.1: Algorithm Experiment Properties
Property Values
Dataflow Montage, Ligo, Cybershake, TPC-H, MapReduce
Output Size 10x−10000x
Operator type SnF, PL
Ranking Derivative, Structure
Search NL, Dyn, SA, Exh, PW
Data transfer DTCP U= 0.1,DTM EM = 0.05
Table 3.2: Operator Properties
Property Values time 0.2,0.4,0.6,0.8,1.0 cpu 0.4,0.45,0.5,0.55,0.6 memory 0.05,0.1,0.15,0.2,0.25 data 0.2,0.4,0.6,0.8,1.0 Experimental Setup
Dataflow Graphs: We examine five families of dataflow graphs: Montage [25] (Fig. 3.7A), Ligo [13] (Fig-ure 3.7B), Cybershake [12] (Fig(Fig-ure 3.7C), MapReduce (Fig(Fig-ure 3.8), and the first 10 queries of TPC-H [6] (Figure 3.3 shows query 8). The first three are abstractions of dataflows that are used in scientific applica-tions: Montage is used by NASA to generate mosaics of the sky, Ligo is used by the Laser Interferometer Gravitational-wave Observatory to analyze galactic binary systems, and Cybershake is used by the Southern California Earthquake Center to characterize earthquakes. The MapReduce and TPC-H graphs are generated by our dataflow language as presented in Section 3.3.
Figure 3.7: The scientific graphs Montage(A), Ligo(B), and Cybershake(C).
Operator Types: We have indicated the values of operator properties as percentages of the correspond-ing parameters of container resources (Table 3.2). For example, an operator havcorrespond-ing memory needs equal to 0.4 uses 40% of a container’s memory. Furthermore, execution times are given as percentages of the cloud’s time quantum and so are data sizes (inputs/outputs of operators), taking into account the network speed. For example, an execution time of 0.5 indicates that the operator requires half of a time quantum to complete its execution (say, 30 minutes according to the predominant Amazon cost model). Likewise, an output of size 0.2 requires one fifth of a time quantum to be transferred through the network if needed. This way, the output data size is in inverse correlation with network speed. Money is measured as described in Section 3.2. We have used synthetic workloads based on Montage, Ligo, and Cybershake dataflows as defined in [7].
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries 1 0 4 5 4 6 4 7 4 8 4 9 5 0 5 1 5 2 5 3 5 4 2 4 3 5 7 6 8 1 0 9 1 1 1 3 1 2 1 4 1 6 1 5 1 7 1 9 1 8 2 0 2 2 2 1 2 3 2 5 2 4 2 6 2 8 2 7 2 9 3 1 3 0 3 2 3 4 3 3 3 5 3 7 3 6 3 8 4 0 3 9 4 1 4 3 4 2 4 4 5 6 5 7 5 9 6 0 6 2 6 3 6 5 6 6 5 5 6 7 6 8 6 9 7 0 7 1 7 2 5 8 6 1 6 4
Figure 3.8: A MapReduce pipeline consisting of two jobs. Each job has three phases, map, combine, and reduce.
We scaled up the specified run times and operator output sizes by a factor of 50 and 1000 respectively; run time to output size ratio was increased by 20. We also set operator memory to 10% of the container capacity. We set the data transfer CPU utilization t be DTCP U = 0.1 and memory needs DTM EM = 0.05. We experimented with changing the output by a factor of10,100,1000, and10000. The properties of operators in MapReduce and TPC-H are chosen with uniform probability from the corresponding sets of values shown in Table 3.2. We have used dataflows with both SnF and PL operators.
Optimization Algorithms: We used Nested Loops Optimizer (NL) as presented in [27], Exhaustive (Exh), Parallel Wave (PW), Dynamic Programming (DP), and Simulated Annealing (SA). For the NL algorithm, we used the greedy algorithm in the inner loop and 10 different number of containers in the range [1−N]with N being the number of operators in the graph. We used both structure and derivative methods for ranking. For the initialization of the SA algorithm, we used DP, Exh using the 10 most influential operators, and random. We used twoRandN eighbor functions (Algorithm 3 Line 6): random, and random based on ranking (each operator is selected with probability proportional to its score).
Measurements: For each experiment we generated 10 different graphs with different seeds. For Mon-tage, Ligo, and Cybershake, we used the generator in [7]. TPC-H and MapReduce graphs are generated from the language described in Section 3.3. We run all the algorithms for each different graph and computed the generalized skyline distance from the results. As defined in Section 3.2, the generalized skyline distance is the distance of the skyline produced by each algorithm from the combined skyline. In the results we show the average value from produced from the 10 different seeds. The time and money are measured in quanta.
Model Validation
We begin by presenting the experiments we performed to validate our model. We used the TPC-H queries. The properties of each operator, as presented in Section 3.2, can be either computed or collected by the system during execution [31]. In this work are assumed to be given. To acquire the properties, we run each operator in isolation using only one container and collected the statistics. The cloud price quantum is set to 10 seconds.
Figure 3.9 shows the real and estimated execution time and money for all queries of TPC-H using 8 containers and 8GB of data in total. We show the fastest plan in the skyline. We observe that our model is able to successfully predict both the time and money of the dataflows. Furthermore, query 9 clearly shows that our model overestimates the actual running time and money, thus it can be used as an upper bound of the real.
Compare with Optimal
In this set of experiments, we compare the skylines produced by our algorithms with the optimal. We generate the optimal result using the Exh algorithm. Since computing optimally the skyline is very expensive for large dataflows, we used TPC-H graphs with 10 operators. To have a better understanding of the quality of the results, we also computed the worst solutions. Those solutions are in the Max-Max skyline. Here we show the results of queries 3, 6, and 9 (the results are similar for the other queries). The characteristics of the operators are chosen with uniform probability from the values in Table 3.2. The results are shown in Figure 3.10. All operators are SnF and the output data replication is 100. We observe that the DP algorithm produces solutions that are not far from optimal. SA significantly improves the results produced by DP, especially for query 6.
0 20 40 60 80 100 120 140 Q1 Q2 Q3 Q4 Q5 Q6 Q7 Q8 Q9 Q10 Time (seconds)
Estimated and Real Time Estimated Real 0 10 20 30 40 50 60 70 80 90 100 Q1 Q2 Q3 Q4 Q5 Q6 Q7 Q8 Q9 Q10 Money (quanta)
Estimated and Real Money Estimated
Real
Figure 3.9: Real and estimated execution time and money for queries of the TPC-H benchmark.
We are aware of the fact that these results cannot be generalized to larger graphs. We are currently working on techniques to reduce the search space by breaking symmetries on the graph itself and compute exact solutions for very large graphs. One way to pursue this is by analyzing the SQL scripts presented in Section 3.3. The dataflow graphs are generated by a relatively small number of SQL queries and they are very symmetric due to the nature of SQL.
Elastic MapReduce
We created several MapReduce dataflows with different numbers of jobs. The properties of each operator are chosen from the values in Table 3.2 with uniform probability. Figure 3.11 shows the results of dynamic programming and parallel wave using the dataflow of Figure 3.8 with SnF and PL operators. First, we observe that we can have significant elasticity on MapReduce dataflows. Second, we see that using the algorithm we propose, we gain significant improvement. We are able to find plans that are as fast as the ones of MapReduce but a lot cheaper. Finally, we observe that the gain for PL is much bigger than the case for SnF. This was expected since the MapReduce graphs are SnF.
We also experimented with varying the size of data. Figure 3.12 shows the results. We observe that when the amount of data increases, we are to find better schedules for both SnF and PL dataflows. Furthermore, we observe that for SnF dataflows, the MapReduce scheduler performs better. We expected this since is designed for this type of dataflows.
Effect of Ranking
In this set of experiments we measured the effect of ranking the operators. We used both structure and derivative ranking with Dyn and SA algorithms. The dataflows have SnF and PL operators. The output data replication factor is set to 100. As a baseline for Dyn, we compare with the FIFO algorithm, i.e., the operators are assigned with the same ordering they become available.
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries 0 1 2 3 4 5
Max Dyn Dyn+SA
Skyline Distance
TPC-H Distance from Optimal Q3 Q6 Q9
Figure 3.10: Comparison of different algorithms with the optimal using TPC-H queries with 10 operators.
160 200 240 280 45 50 55 60 65 70 75 80 Money (Quanta) Time (Quanta) SnF Operators PW Dyn 140 160 180 200 220 10 20 30 40 50 60 70 Money (Quanta) Time (Quanta) PL Operators PW Dyn
Figure 3.11: Dynamic algorithm compared with Parallel Wave on MapReduce graphs.
Dynamic Programming: Figure 3.13 shows the results for Dyn using the Ligo dataflow with 100 PL and SnF operators. We observe that ranking in general, improves dramatically the solutions compared to FIFO for both PL and SnF dataflows. As expected, for SnF operators the results are similar because the rankings do not differ much. For PL operators however, we see that the ranking based on derivative is much more beneficial than the ranking based on structure.
Simulated Annealing: For simulated annealing, we experimented with the the initialization algorithm and the neighbor selection. Figure 3.14 shows the results for the Cybershake dataflow with 100 SnF and PL operators.
In the left plot of Figure 3.14, we observe that the initialization algorithm is essential. The best choice is the Dyn algorithm. It does not obtain to much worse results than Exh for SnF operators and is the best for PL operators. Using Exh on the 10 most influential operators is the best for SnF but not beneficial at all for PL operators. This is because the influential operators for PL dataflows are many more and much more sensitive than the SnF operators. We are currently working on ways to make SA multi phase: each phase considers only a subset of operators. The operators can be partitioned based on their ranking. This could be done using histogram based partitioning [23] like equi-width or max-diff.
The middle plot of Figure 3.14 shows the results of SA using different algorithms to choose a neighbor. Interestingly, the ranking based neighbor selection hurts SA. It prevents it from navigating the search space by actually restricting it to select only the most influential operators to move. The purely random neighbor selection is the best choice.
In the right part of Figure 3.14 we show the effect of using different ranking algorithms. We observe that the derivative ranking is significantly better than structure for PL operators and do not differ to much for SnF operators.
Compare Algorithms
In this set of experiments, we compare all algorithms using the scientific dataflows Montage, Ligo, and Cybershake. All dataflows have 50 operators and the data replication is 100. We compare the algorithm proposed in this work with the ones in [27] using the generalized skyline distance.
0.01 0.1 1 10
10 100 1000 10000
Skyline Distance (log scale)
Data Replication (log scale) SnF Operators Dyn PW 0.01 0.1 1 10 10 100 1000 10000
Skyline Distance (log scale)
Data Replication (log scale) PL Operators Dyn
PW
Figure 3.12: Dynamic algorithm compared with Parallel Wave on MapReduce graphs.
0 1 2 3 SnF PL Distance
Dynamic Programming for Ligo 100 Derivative
Structure FIFO
Figure 3.13: Dynamic programming with different ranking using Ligo dataflow with 100 PL and SnF oper-ators.
Figure 3.15 shows the results. We observe that the combination of Dyn algorithm to find the initial skyline and refinement of that skyline by SA with random neighbor is the best choice. In some cases, the solutions produced are one order of magnitude better than the NL algorithm. As observed earlier, using ranking with SA is not beneficial. Furthermore, the Exh algorithm do not give very good results compared to Dyn+SA. We remind that the Exh uses only the 10 most influential operators.
We also measured the number of schedules in the skyline produced by each algorithm. In general, Dyn finds more schedules in skyline compared to NL and SA improves that even more. That, combined with the fact that Dyn+SA produces the best skyline of schedules, makes it an obvious choice.
We also experimented with varying the size of the output data generated by the operators for Montage, Ligo, and Cybershake dataflows with 50 operators. Figure 3.16 shows the results. We observe that Dyn+SA proposed in this work produces plans that in some cases are one order of magnitude better than NL.
Finally, Figure 3.17 shows the running time of the algorithms. We observe that Dyn algorithm has the fastest running time. Exh and Exh+SA have very long running time compared to the others because they examine a very large number of schedules. The call of the Schedule Estimation sub-routine is expensive.
3.7.2 Systems
The parameters of the experiment are shown in Table 3.3.
Experimental Setup
Execution Environment: In our experiments, all containers have the same resources (cpu, memory, disk, and network). The resources were kindly provided by Okeanos3, the cloud by GRNet4. We used 32 virtual machines (containers), each with 1 CPU, 8 GB of memory, and 60 GB of disk. We measured the average network speed to 15 MB/sec. We used Hadoop 1.1.2 and Hive 0.11. The time and money are measured in quanta.
3
okeanos.grnet.gr
OptiqueDeliverable D7.1 Techniques for Distributed Query Planning and Execution: One-Time Queries 0 20 40 60 80 100 120 SnF PL Distance
Init algorithm for Cybershake 100 Dyn Exh Random 0 1 2 3 4 5 6 7 8 SnF PL Distance
Neighbors for Cybershake 100 Derivative Structure Random 0 2 4 6 8 10 12 SnF PL Distance
Dyn Ranking for Cybershake 100 Derivative
Structure
Figure 3.14: Varying data for Ligo dataflow with 100 PL and SnF operators.
0.01 0.1 1 10 100
NL Dyn Dyn+SA Dyn+SA(Rank)Exh+SA Exh+SA(Rank)
Skyline Distance (log scale)
Distance for Montage, Ligo, and Cybershake (50 Ops) Montage Ligo Cybershake 0 5 10 15 20 25 30
NL Dyn Dyn+SA Dyn+SA(Rank)Exh+SA Exh+SA(Rank)
# points in skyline
# points in skyline for Montage, Ligo, and Cybershake (50 Ops) Montage
Ligo Cybershake
Figure 3.15: Compare distance and number of schedules in the skyline of different algorithms from the combined skyline on various scientific dataflows with 50 operators.
Dataset: We generated a total of 512 GB data (or approx. 2.2 billion tuples) using the generator provided for TPC-H. The benchmark has eight tables:
region(1), partsupp(32, ps_partkey), orders(32, o_orderkey), lineitem(32, l_orderkey), customer(32, c_custkey),
part(32, p_partkey), nation(1),and supplier(1).
In parenthesis, we show the number of partitions we created for each table and the key based on which
the partitioning was performed. In Hive, we used the CLUSTERED BY when we created the tables. The
replication factor of Hadoop is set to 1 in order to have a clear comparison with our system. After loading the data to Hadoop we used the balancer to evenly distribute the data to the cluster. In ADP, the tables are horizontally partitioned and distributed to the cluster. The partitions of the same table are placed on different virtual machines.
Table 3.3: System Experiment Properties
Property Values
Dataflow TPC-H, MapReduce
Operator type SnF
Ranking Derivative, Structure
Search Dyn, PW
Num of VMs 8, 16, 24, 32
TPC-H Data (GB) 64,128,256,512