A Data Pre-partitioning and Distribution Optimization Approach for Distributed Data Warehouses
Billel ARRES Universite Lumiere Lyon 2
69676 Bron, France [email protected]
Nadia KABACHI Universite Lyon 1 69100 Villeurbanne, France [email protected]
Omar BOUSSAID Universite Lumiere Lyon 2
69676 Bron, France [email protected]
Abstract—The increasing volumes of relational data let us find an alternative to cope with them. The Hadoop framework - an open source project based on the MapReduce paradigm - is a popular choice for distributed data warehouses and big data analytics. In this paper, we propose an original approach for partitioning and collocating data in distributed file systems, especially Hadoop-based systems, and this, to overcome the default data partitioning and placement policies which does not take any data characteristics into account. The goal is to reduce the amount of data transfer required during Mapreduce’s shuffling phase. Indeed, the efficiency of many relational operations can be improved if a careful data fragmentation design and placement policy are applied, including indexing, grouping, aggregation and joins. Based on k-means clustering methode that allows to master the number of clusters through its k parameter, we investigate the performance gain for OLAP cube construction on a multi-nodes cluster with and without data organization. And this, by varying the number of clusters and data warehouse size. Our experiments suggest that defining a good data partitioning and placement schemes during the implementation of the data warehouse increase significantly the OLAP cube computation and querying performances.
Keywords-Data warehouses; On-Line Analytical Processing (OLAP); Mapreduce; Data placement;
I. INTRODUCTION
In the recent past, we have witnessed dramatic increases in the volume of data literally in every area: business, science, and daily life to name a few. Today, some claim that data (more specifically, data-intensive science) are the fourth paradigm in scientific research after experimenta- tion, observation, theory, and computational simulation. The storage and processing of such an overwhelming amount of data is a challenging task in the current computing environments. To address the scalability requirements of today’s data analytic, parallel shared-nothing architectures of commodity machines (often consisting of thousands of nodes) have been lately established as the de-facto solution.
MapReduce [9] is a well-known distributed framework for programming commodity computer clusters to perform large-scale data processing algorithm. Hadoop [17], an open- source MapReduce implementation, is able to process big data sets in a reliable, efficient and scalable way. Based on Hadoop, many cloud data warehouses (e.g., Hive [4], and
HadoopDB [1]) are developed and widely used in various fields. Even though these data warehouses support, the per- formances are unsatisfactory. The reasons for this situation are: (1) these systems do not provide big data oriented OLAP optimizations; (2)the join operation, which is quite common operation in OLAP, is very inefficient when big data are involved [14]. In this paper, we studied the benefits of data partitioning and collocation within the context of data warehousing and OLAP querying. Based on query workload, we introduce a mechanism called reference table (RT) as an extension of Hadoop’s distributed file system, to collocate dimensions tables predicates and attributes that are related or frequently used on the same or closest set of nodes, eliminating the network overhead by reducing data transfer in MapReduce’s shuffle phase, since related set of files will be processed jointly. The rest of this paper is organized as follows. Following the introduction, Section 2 presents background and related work. Section 3 describes the problem we address in this paper. Section 4 details the proposed approach and system implementation. Section 5 evaluates the efficacy of the proposed approach. Finally, conclusions and future works are summarized in Section 6.
II. RELATEDWORK
OLAP (On-Line Analytical Processing) was introduced in the work done by [5]. They provided an overview of data warehousing and OLAP technologies, with an empha- sis on their new requirements. Considering the past two decades have seen explosive growth, both in the number of products and services offered and in the adoption of these technologies in industry, their other work [6] gave the introductions of OLAP and data warehouse technologies based on the new challenges of massively parallel data architecture. There exist some optimization approaches of distributed data warehouses system, which are related to this paper. Data warehousing improvements can be classified as follows: (1) data layouts; (2) pre-computation for indexing;
(3) intentional data placement. The latter two are close to our proposal and introduced briefly in this section. Data layouts aims to enhance MapReduce performance. In this case, Llama [19] proposes the use of a columnar file (called
CFile) for data storage in HDFS. This enables selective access only to the columns used in the query. However, this type of storage provides more efficient access to data than traditional row-wise storage only for queries that involve a small number of attributes. This is not always the case for data warehouses. For indexing, Hadoop++ [10] is a system that provides indexing functionality for data stored in HDFS by means of User Defined Functions (UDFs), i.e., without modifying the Hadoop framework at all. In fact, Hadoop++
can only collocate two files that are created by the same job, and requires reorganization of the data as new files are ingested into the system. In the case of data warehouses it is necessary to collocate all the data warehouse files (tables), not only two, and this can be a major inconvenience. Other research efforts tried to empower Hadoop by an intentional data organization. CoHadoop [13] is an extension of Hadoop to enable applications to control where data are stored. It retains the flexibility of Hadoop since it does not require users to convert their data to a specific format. However, it has not proved its efficiency on multidimensional data processing. In summary, at present the lack of effective support for multidimensional data processing model and OLAP analysis needs to be resolved urgently in big data era.
At the same time, Hadoop as a cloud-computing framework is most widely used in the big data analysis platform, but the OLAP optimizations, based on Hadoop, are still blank.
Based on our previous research [3], in which we presented a data collocation mechanism for big data environment, we designed the proposed approach as an extension of the Hadoop file system, which has been proven efficient on analysis of big data. This research has certain theoretical and practical values.
III. PROBLEMSTATEMENT
The implementation of a data warehouse that incorporates the best features of the MapReduce parallel processing model - scalability and fault tolerance - is the goal of several research, eg [18] and [1]. For a program to be executed concurrently using several processing nodes in a parallel computer system (e.g. a cluster), the work has to be divided and assigned to each one of them. This usually requires also a given input dataset to be divided into blocks (chunks) and assigned to each of the nodes. This is carried out in a two- step process: data partitioning (or fragmentation) to divide the dataset, and data placement (or allocation) to assign the fragments to the system’s nodes. By default, tables files are partitioned into a set of partitions (data blocks) using horizontal data partitioning (HDP) [15], then the Hadoop distributed file system (HDFS) tries to balance load by placing the blocks randomly on the Datanodes.
The default data placement policy of HDFS arbitrarily places partitions obtained across the cluster so that mappers often have to read the corresponding partitions from remote nodes. This causes a high data shuffling costs and network
Figure 1: Default DW blocks processing.
overhead when querying step (Figure 1). The goal of our proposed approach is to minimize this cost instead of the network overhead by reducing the transferred data in the shuffle phase. In a context of data warehousing with Mapreduce, which is based on the Map and Reduce steps, transferring data between these two phases remind often more time consuming than processing data itself [2]. Let us formally define the transferred data which we want to minimize.
Definition 1: (Data transfer in shuffle phase)
The MapReduce framework operates exclusively on key- value pairs [17], that is, the framework views the input to the job as a set of key-value pairs and produces a set of key-value pairs as the output of the job. The output pairs can be different types than the input pairs.
Let jobα denote a MapReduce job. It is composed of Mα={m1, ..., mp} map tasks and Rα={r1, ..., rq} reduce tasks. We do not consider map or reduce tasks which fail or are the result of speculative execution and are not retained.
We assume that each map taskmi processes chunkci, for i = 1, ..., p.
Let Iα = {ip1, .., ipm} be the set of intermediate key- value pairs produced by the map phase.key(ipj) represents the key of intermediate pair ipj and size(ipj) represents its total size in bytes. Kα is defined as the set of inter- mediate keys produced in the execution of jobα, Kα =
∪ip∈Iαkey(ip). We define output(mi) ⊆ Iα as the set of intermediate pairs produced by map taskmi. We also define input(ri) ⊆ Iα as the set of intermediate pairs assigned to reduce taskri. This assignment is controlled by the reduce partitioning function:
part : Kα→ Rα
Let N = n1, .., ns be the set of machines that compose the cluster;node(t) represents the machine where task t is executed:
node : Mα∪ Rα→ N
The way in which this assignment is done depends on the scheduling algorithm, the properties of the job and the characteristics and behaviour of the cluster where it is executed.
Now we distinguish between local and remote transfers of intermediate tuples. Letipj be an intermediate key-value pair, produced in map task m, i.e., ipj ∈ output(m) and consumed by reduce taskr, i.e., ipj∈ input(r). We define Pα(ipj) ∈ 0, 1 as a variable that indicates whether ipj is transferred or not through the network:
Pα(ipj) =
0 ifnode(m) = node(r) 1 ifnode(m) 6= node(r)
From functionPαwe can derive the total amount of data that is transferred through the network in the execution of jobα.
transf er(jobα) = X
ipj∈αi
size(ipj)Pα(ipj)
Definition 2: Let W = {job1, ..., jobw} be the set of jobs in the workload sample. Our goal is to minimize the total amount of data transferred over the network in the shuffle phase of jobs involved inW :
minimize
X
jobα∈W
transf er(jobα)
by optimizing: (1) the default data partitioning with a vertical pre-partitioning of data warehouse schema before its implementation (2) data placement by collocating related partition’s chunks obtained. In both cases, we propose a data mining based approach, which is performed transparently to the users, in order to free them from the burden of complex partitioning and collocating optimisations. Our approach is summarized in Figure 2.
Figure 2: DW blocks processing with data pre-partitioning and collocation approach.
IV. THEPROPOSEDAPPROACH
Data partitioning and data collocation are a well known and widely used optimisation methods (OM) in data base community. In the context of parallel processing, taking into
account the existing interaction between these two OM’s can increase significantly the optimizations results.
A. Data Pre-partitioning
Basically the original input data warehouse files (tables) are stored at a single node or outside the parallel system, then it has to be divided and transferred to each of the participating nodes. By default, Hadoop distributed file system partitions these input data files into large horizontal partitions (at least 64MB up to 1GB)[10] without taking any data warehouse schema characteristics into account. Our first optimisation approach consist on applying a vertical partitioning where each data warehouse table file is split in two or more tables having fewer columns but keeping the same number of rows. Based on a set of query workload, the process takes as input data warehouse tables files and outputs fragments by applying the partitioning algorithm described below. Note that all the process is done off-line.
1) Principle: The data warehouse schema consists of a set of fact and dimension tables along with their attributes.
Vertical partitioning consists on splitting up a table by columns: one set of columns goes into one data store, and another set of columns goes into a different data store, it is necessary to select the appropriate dimension’s attributes such that the query cost is minimized.
Figure 3: Data pre-partitioning principle.
2) Building the Relevancy Matrix: The process of build- ing the Relevancy matrix has two main steps. The first one consists on generating a query attributes matrix QA from the query workload.
Let T = {t1, ..., tn} be the set of the warehouse tables (facts and dimensions tables). The workload consists of a set of queriesW = {q1, ..., qm} and αti = {ae|ae∈ ti} be the set of attributes inT . The matrix QA represents couples (qi, aj), where general term QAij equal to 1 if aj appears in queryqi and to 0 otherwise.
In the second step we build, from the obtained query attribute matrix, a Relevancy matrix M which is a n × n symmetric matrix, n is the number of different attributes in
the tenant’s data, and the elements are the association degree between two attributes, it is defined as the number of queries which involves two attributesaiandaj, that is the frequency of occurrence of two attributes, it can be defined as follows:
Mij= Relevancy(ai, aj) =X
T (ai, aj)(ai, aj∈ αti) Among which,αtiis the tenant’s data attribute sets,ai, aj
are the arbitrary two different attributes in αti, T (ai, aj) is the operation (query) which involves ai and aj jointly, PT (ai, aj) is the total number of operations which involves ai andaj.
3) Partitional Clustering Based on BEA: Bond Energy Algorithm [12], short for BEA, proposed by McCormick et al., is a rearrangement clustering technique which is widely used in vertical partitioning big tables in distributed database system. BEA takes the relevancy matrix as input, and keep replacing arbitrary two rows or columns so that the elements which has high relevancy could get together, and having a maximum ME. ME is short for Measure of Effectiveness, it represents the similarity between each element with the up and down and left and right the four neighbours, defined as follows:
M E(AA) = 1 2
XM i=1
XN j=1
ai,j[ai,j+1+ai,j−1+ai+1,j+ai−1,j]
AA represents a M×N non negative matrix with boundary conditions: a0,j = aM +1,j = ai,0 = ai,N +1 = 0. If element in the matrix on the left side of the left or the right of the right matrix (upper and lower boundary similarly), the element value is 0, namely boundary elements and the elements outside the matrix have no relevancy.
According to the definition of BEA, clustering the rel- evancy matrix consists on making attributes which have higher relevancy get together so that ME have a maximum value. The objective function is defined as follows:
M ax{M E(AA) = 1 2
XN i=1
XN j=1
Relevancy(ai,j[ai,j+1+ai,j−1])}
The clustering process has two steps:
1) Reorganize the init relevancy matrix by first placing one of the columns arbitrarily,i = 1.
2) Try to place individually each of the remainingN − i columns in each of the i + 1 possible positions (to the left and right of the i columns already placed), and compute each column’s contribution to the M E.
Place the column in the position that gives the largest incrementalM E. Increment i by 1 and repeat this step untili = N .
The details of the above algorithm can be found in [12].
4) Fragments Construction: The last step takes the clus- tered relevancy matrix (BEA’s output). Elements with similar values are grouped together to identify attributes clusters, making sure that subsets obtained after partition have the highest relevancy and are not breaking the privacy con- straints, which are:
Completeness:Guaranteed by the partitioning algorithm, which assigns each attribute to one partition.
Reconstruction:A relation R decomposed into fragments R1,R2,...,Rnis reconstructed by the join operation:R =⊲⊳K
Ri, for all Ri.
Disjointness: Attributes have to be disjoint in vertical fragmentation. Two cases are distinguished: If tuple IDs are used, the fragments are really disjoint. Otherwise, key attributes are replicated automatically by the system.
At the end of the process, we obtain a data warehouse fragments which will be transferred to the distributed file system. These fragments, initially obtained by a vertical fragmentation of the initial data warehouse, will be then horizontally fragmented by the distributed file system and distributed to data nodes automatically. At this point we can affirm that this first optimisation, as it combines two different methods of slicing tables, improves performance of queries as we will confirm in the evaluation section. The main advantage is that each of the resulting tables will have a lot less rows and also less attributes. Thus the method spatially expands a very large dimension, both vertically and horizontally, into tables of far smaller sizes.
B. Data Collocation
The main idea of our work suggests that to improve data warehouse query response time, particularly OLAP queries, we must first define a good strategy for data partitioning and distribution. As data partitioning was explained above, we detail in this section the second part of our approach which consist on a blocks placement strategy as an extension of the distributed file system.
1) Principle: Our collocation approach exploit selection dimension’s attributes found in workload’s queries to derive suitable blocks placement. It outputs a data warehouse distribution schema (metadata) and is subdivided into three steps that are:
1) dimension’s attributes extraction from the workload;
2) dimension’s clustering (using k-means method);
3) reference table construction for data collocation.
2) Dimension’s Attributes Extraction: Selection attributes set is simply parsed from workload. LetD = {d1, ..., dn} be the set of the warehouse tables. The workload consists of a set of queriesW = {q1, ..., qm}.
Let αqi = {ae|ae ∈ qi} be the set of attributes related to a query qi and βdj = {bf|bf ∈ dj} the set of attributes of a dimension dj. To make it simple, let consider a query that involves two attributes a1 anda2 belonging to d1 and
d2 respectively, our strategy consists on collocating thed1j
andd2k blocks (chunks) on the same or closest nodes, while remaining tables blocks will be placed via Hadoop’s default strategy over the remaining nodes. For that, as a first step, parsed dimensions are coded in a query-dimension matrix QM whose general term QMij equals to 1 if ∃bf in βdj
which is also in αqi, and to 0 otherwise. For example, the QM matrix corresponding to W and D is featured in Table 1.
d1 d2 d3 d4 ...
q1 1 1 0 0
q2 0 0 1 0
...
q20 0 0 0 1
Table I: Sample query-dimensions matrixQM
3) Dimensions Clustering: Our objective is to derive blocks placement schema that optimize data access for queries, especially OLAP queries. Since the HDFS default block placement policy does not take any data characteristics into account, clustering data warehouse blocks files with respect to queries achieves our goal. Intuitively, we ideally seek to build rectangles (clusters) of 1’s in matrix QM. For study purpose, we chose the widely-used k-means algorithm [11] for clustering. But other clustering algorithms can be used. This algorithm inputs vectors of object attributes (columns of QM in our case). It attempts to find the centers of natural clusters in source data by minimizing total intra- cluster variance:
Xk i=1
X
xj∈Ci
(xj− µi)2
where Ci, i = 1, ..., k are the k output clusters and µi is the centroid (mean point) of points xj ∈ Ci. Let C be the set of all clusters Ci. Usually, having k as an input parameter is viewed as a drawback in clustering. In our case, this turns out to be an advantage, since we want to limit the number of clusters, typically to the number of nodes or sets of nodes (Racks) the data warehouse will be distributed on. In practice, we used the Weka [8]
SimpleKMeans implementation of k-means. SimpleKMeans uses the Euclidean distance to compute distances between points and clusters. It directly inputs matrix QM (acually, the dj vectors) and k, and outputs set of dimensions clusters C. For example, on matrix QM (Table 1) with k = 3, SimpleKMeans outputs: C = ((d1,d2),(d3),(d4)).
4) Reference Table Construction: The reference table (denoted RT) construction step consists on assigning each dimensions clustering output (Ci) to a reference (Ref(i)), each reference Ref(i) is represented by a random integer value, but other data types may also be used. There is an N:1 relationship between clustersCi and references.
Figure 4: Example of four tables blocks collocated using the Reference Table on a multi-nodes cluster.
During the loading phase, each data warehouse table file is assigned to at most one reference and many tables files can be assigned to the same reference. Tables file’s chunks with the same reference are then placed on the same (or closest) set of Datanodes, whereas others with no reference are placed via Hadoop’s default strategy.
The reference table is set with default values according to the policy location initially defined. Figure 4 shows the RT corresponding to four tables files collocation on a cluster.
In our work, we extended the Hadoop file system HDFS to support the customized data warehouse placement policy.
We did not address other aspects such as the variation of the blocks size, which remains equal to default (64MB). Nor the replication policy which remains to default (Replication factor to 3). Our goal is the study of the plain query per- formance gains due to careful data warehouse organization in the context of parallelization with MapReduce. We used on top of our extended Hadoop version the Apache Hive [4] which is a data warehouse software that allows querying and managing large datasets residing in distributed storage.
Hive provides an SQL-like language called HiveQL and has also support for creating data cubes[4].
V. EXPERIMENTALRESULTS
In the experiments, we aim at comparing our data pre- partitioning and distribution approach, which is the output of this project work, to default strategy used on Hadoop based clusters. We compare the performances (Execution time) of queries presented in section 5.1, first, without optimization (Default) using the original Hadoop version, then with optimization (Optimized) using our HDFS extension.
A. Experimental Conditions
In order to assess the effectiveness of our approach, we de- signed an adapted benchmark targeted to multidimensional data, called TPC-OH benchmark, which is inspired from the well-known TPC-H benchmark [16], the most prominent decision support benchmark. TPC-H benchmark consists of a suite of business oriented complex ad-hoc queries and concurrent data modifications. The workload and the data
Table II: DW Vertical Pre-partitioning results
Schema Initial (nbr) Fragmented (nbr)
LineItem 01 07
Orders 01 06
Customer 01 05
PartSupp, Supplier, Part 01 03 Region, Nation, Time 01 01
populating the database have been chosen to have broad industry-wide relevance. The workload is composed of 22 SQL queries with a high degree of complexity. Existing TPC-H implementation allows the generation of raw data stored into eight TBL files, namely Region, Nation, Cus- tomer, Supplier, Part, PartSupp, Orders, LineItem, by using a specific scale factor SF. The latter determines the final TPC- H data size. Basically, TPC-OH is a suitable transformation of the TPC-H benchmark into a multi-dimensional OLAP benchmark. Indeed, each business question of TPC-H work- load is mapped into an OLAP star-join query, and a temporal dimension (Time table) is added to the data warehouse. Such star-join query use ”with cube” operator to group the results. Also, we translated the TPC-H SQL workload into an HQL workload, since we used the Apache Hive as data warehousing software for querying and managing the data residing in Hadoop distributed storage system.
B. Cluster Setup
To achieve evaluation objectives, we used 10 PCs in a cluster (1 NameNode, 9 DataNodes). The NameNode is equipped with Intel Xeon E5-2630 (Six Core HT,2.6GHz), 16GB (4x4GB) RAM, and a 1TB SATA Hard Drive. The DataNodes are equipped with Intel Core i5-2400M, 4GB RAM, and a 300GB HDD. The OS is Ubuntu 12.04 LTS, and the Mapreduce framework is Hadoop 1.0.3. The network speed is 1G bps. We implement our data warehouse blocks placement approach by using the HDFS-385 API (Version 1.2.0) which is an expert-level interface for developers who want to try out custom placement algorithms to override the HDFS default placement policy [7]. Our code is written in Java and is available on demand.
C. Data loading
Table 2 shows the pre-partitioning schema (number of fragments) of TPC-OH relational data warehouse obtained, with respect to the workload as explained in Section 4.1.
For data collocation, we arbitrarily fixed k-means parameter k = 10 to process the workload, which could correspond to cluster’s size. In this first experiment, we investigate the effect of the proposed pre-partitioning and collocation approach on time needed to load data warehouse tables by the modified (Optimized) file system and the original one (Default). The data warehouse size is equal to 920GB.
From Figure 5 it can be seen that loading data time decreases significantly as the cluster size increases. The most noticeable difference between the two systems is on a 16 nodes cluster size, where about 3493,11 seconds elapsed for the newly (optimized) file system to execute successfully, however, the original (default) Hadoop file system takes less than 3247,2 seconds. A difference of 7 percent. In fact, to load data files the two systems performed a full MapReduce job. However, the optimized file system is slightly slower, as shown in Figure 5. This is due to the increased network utilization by collocating different files partitions (blocks).
Note that with these initial tests performed in this work, loading cost encountered by the proposed approach is very small compared to the savings at query time as shown in the next section.
Figure 5: Loading data warehouse time (920 GB)
D. Query Execution
In this part of experiments, we used typical business queries of TPC-H [16] for which the shuffle phase has a significant impact in the response time. With respect to the partitioning scheme shown in Table 2, we used the following queries: Q5 and Q9 that are examples of hash joins on different columns, Q7 that executes a replicated join and Q17 that executes a co-group. Figure 6 shows that the proposed approach improves clearly query response time, especially for Q17 performing co-group operation, and this was expected since collocating related data blocks leads to map only tasks job operations rather than map and reduce tasks.
In contrast, Figure 7 shows the elapsed time for data cube computation by varying the data warehouse size on a 10 nodes cluster (N). For data collocation, we fixed k = 10 to process the workload as in the previous tests. As shown in the figure, cube computation execution time increases significantly as the data size increases and the benefits of the proposed approach are appreciable with the increasing size compared to default (HDFS) data distribution. The Figure shows that tables files collocation improves the query performance from 10% for 160GB to 25% for a 920GB data warehouse size. This behaviour is expected since collocation of data, in the context of data warehousing, avoids network
Figure 6: Queries execution time (920GB)
Figure 7: Building data cube
overhead, besides, it reduces the expensive data shuffling operation, which is the most time consuming phase for Mapreduce compared to default data placement policy.
E. Influence of Number of Clusters
In this experiment, we studied the effect of data colloca- tion on the query response time by varying the clustering parameter k. As previous tests, we fixed the cluster size (10 nodes), data warehouse size (to 640 and 920 GB) and varied K-means parameter k to observe its influence on a sample query response time. Figure 8 confirms that performance improves quickly when collocation is applied, but tends to degrade when the number of clusters increases. Furthermore, it hints that an optimal number of clusters for tested data warehouse and workload benchmark lies between 5 and 6, making us conclude that over-collocation can be harmful and must be detected and avoided. Note that, on Figure 8, k = 1 corresponds to performance records when no collocation is applied (this one collocation is the HDFS default warehouse distribution for all DW’s tables).
It is important to note that the distribution and collocation (of data and processing) using parallel programming models like Mapreduce are two opposing optimization techniques.
In fact, collocation helps to minimize the data flow between the map and reduce phases (the shuffle phase) in order to avoid network overhead, applying the principle that says
”Moving calculation is cheaper than moving data”, allowing the intermediate results produced by the map phase to be executed on the same node. However, by seeking to collocate
Figure 8: Influence of number of clusters
data, treatments will be centralized, making the task of each node heavier by creating a big load imbalance and therefore slower processing results. In this case, the objective is to find the right balance between distribution and collocation data, ie finding the right ”k” which in our case is equal to 5 as shown in Figure 8.
VI. CONCLUSIONSANDFUTUREWORK
In this paper, we present a data warehouse pre-partitioning and placement policy as an extension of the distributed file system to override the default policy and improve query performances. Our approach is simple yet flexible; it can be exploited in different ways by distributed and Hadoop-based data warehousing solutions. We studied the performance of our approach under different settings and compared it with default plain Hadoop solutions. Our experimental results show that data pre-partitioning and collocation optimization approach, outperforms default placement strategy in terms of performance gain by reducing the overhead of data shuffling and network. In next step, we will extend the experiments to study the effects of other configuration parameters on collocation data in the context of parallel data warehousing such as partitions size, replication factor and OLAP query complexity. We are also studying an intentional data place- ment strategy for large data warehouses with the integration of Multi-Agent Systems (MAS) and Intelligent Agents to the process, making clusters self-organized and autonomous dealing with new data and queries which are not included in the system’s workload and are appended continuously.
REFERENCES
[1] A.Abouzeid, K.B.Pawlikowski, D.Abadi, A.Silberschatz, and A.Rasin. Hadoopdb: An architectural hybrid of mapreduce and dbms technologies for analytical workloads. PVLDB, 2(1):922–933, 2009.
[2] A.Elsayed, O.Ismail, and M.E.El-Sharkawin. Mapreduce:
State-of-the-art and research directions. International Journal of Computer and Electrical Engineering, 6(1):34–39, 2014.
[3] B. Arres, N. Kabachi, and O. Boussaid. Optimizing olap cubes construction by improving data placement on multi- nodes clusters. 23rd International Conference on Paral- lel, Distributed, and Network-Based Processing, 45:520–528, 2015.
[4] A.Thusoo, J.S.Sarma, N.Jain, Z.Shao, P.Chakka, S.Anthony, H.Liu, P.Wyckoff, and R.Murthy. Hive: a warehousing solu- tion over a map-reduce framework. Proceedings of the VLDB Endowment, 2(2):1626–1629, 2009.
[5] S. Chaudhuri and U. Dayal. An overview of data warehousing and olap technology. ACM Sigmod record, 26(1):65–74, 1997.
[6] S. Chaudhuri, U. Dayal, and V. Narasayya. An overview of business intelligence technology. Communications of the ACM, 54(8):88–98, 2011.
[7] A. S. Foundation. Design a pluggable in- terface to place replicas of blocks in hdfs.
https://issues.apache.org/jira/browse/HDFS-385, 2014.
[8] G.Holmes, A.Donkin, and H.Witten. Weka: a machine learn- ing workbench. Proceedings of the 1994 Second Australian and New Zealand Conference on Intelligent Information Sys- tems, 14:357–361, 1994.
[9] J.Dean and S.Ghemawat. Mapreduce: simplified data process- ing on large clusters. Communictions of the ACM, 51:107–
113, 2008.
[10] J.Dittrich, J.A.Quiane-Ruiz, A.Jindal, Y.Kargin, V.Setty, and J.Schad. Hadoop++: making a yellow elephant run like a cheetah (without it even noticing). Proceedings of the VLDB Endowment, 3(1):515–529, 2010.
[11] J.MacQueen. Some methods for classification and analysis of multivariate observations. Proceedings of the Fifth Berke- ley Symposium on Mathematical Statistics and Probability, 1:281–297, 1967.
[12] W. T. McCormick Jr, P. J. Schweitzer, and T. W. White. Prob- lem decomposition and data reorganization by a clustering technique. Operations Research, 20(5):993–1009, 1972.
[13] M.Eltabakh, Y.Tian, F.Gemulla, A.Krettek, and J.McPherson.
Cohadoop: Flexible data placement and its exploitation in hadoop. PVLDB, 4(9):575–585, 2011.
[14] J. Song, T. Li, X. Liu, and Z. Zhu. Comparing and ana- lyzing the energy efficiency of cloud database and parallel database. Advances in Computer Science, Engineering and Applications, 167:989–997, 2012.
[15] M. Stonebraker, D. Abadi, D. J. DeWitt, S. Madden, E. Paul- son, A. Pavlo, and A. Rasin. Mapreduce and parallel dbmss:
friends or foes? Communications of the ACM, 53(1):64–71, 2010.
[16] TPC-H. Transaction processing performance council.
http://www.tpc.org/tpch, 2012.
[17] T. White. Hadoop: The Definitive Guide. O’Reilly Media, Inc., 1st edition, 2009.
[18] W.Huiju, Q.Xiongpai, Z.Yansong, W.Shan, and W.Zhanwei.
Lineardb: A relational approach to make data warehouse scale like mapreduce. In Database Systems for Advanced Applications, volume 6588, pages 306–320. 2011.
[19] Y.Lin, D.Agrawal, C.Chen, and S.Wu. Llama: leveraging columnar storage for scalable join processing in the mapre- duce framework. ACM SIGMOD International Conference on Management of Data (SIGMOD), pages 961–972, 2011.