Processing Moving Objects Data At Large Scale
Der Fakult¨at f¨ur Mathematik und Informatik der FernUniversit¨at in Hagen
vorgelegte
Dissertation
zur Erlangung des akademischen Grades eines Doktors der Naturwissenschaften
(Dr. rer. nat.)
von Jiamin Lu
Geburtsort: NanTong, JiangSu, China
Eingereicht am: 27.03.2014 Tag der m¨undlichen Pr¨ufung: 09.07.2014
1. Berichterstatter/in: Prof. Dr. Ralf Hartmut G¨uting 2. Berichterstatter/in: Prof. Dr. Mohamed F. Mokbel
Contents
List of Tables v
List of Algorithms vii
List of Figures x
Abstract xi
1 Introduction 1
1.1 Background and Motivations . . . 1
1.2 Thesis Contributions . . . 4
1.2.1 System Infrastructure . . . 5
1.2.2 Declarative Query Language . . . 6
1.2.3 Cloud Evaluation . . . 7
1.2.4 Optimization Technologies . . . 8
1.3 Thesis Organization . . . 9
2 Related Work 11 2.1 Early Parallel Database Systems . . . 11
2.1.1 Parallel Query Processing . . . 12
2.1.2 Parallelism Metrics . . . 13
2.2 MapReduce . . . 14
2.2.1 The MapReduce Paradigm . . . 15
2.2.2 Pros and Cons . . . 16
2.3 Hadoop Extensions . . . 17
2.4 SECONDODatabase System . . . 19
2.5 Parallel Processing on Specialized Data . . . 21 i
3 System Infrastructure 23
3.1 System Components . . . 24
3.2 Parallel SECONDO File System . . . 27
3.2.1 PSFS Operators . . . 30
3.2.2 PSFS vs. HDFS . . . 34
3.3 System Management Components . . . 36
3.3.1 Parallel SECONDOPreferences . . . 38
3.3.2 Graphical Preference Editor . . . 40
4 Declarative Language Support 45 4.1 Parallel Data Model . . . 46
4.1.1 SECONDOExecutable Language . . . 46
4.1.2 Representing Distributed Objects . . . 48
4.1.3 Flow Operators . . . 53
4.1.4 Hadoop Operators . . . 54
4.2 PBSM: Partition Based Spatial Merge . . . 56
4.2.1 PBSM in Parallel SECONDO . . . 58
4.2.2 PBSM with In-memory Index . . . 63
4.2.3 PBSM for Moving Objects Data . . . 64
4.2.4 Represent PBSM in Executable Language . . . 66
4.2.5 Evaluations . . . 72
4.3 Parallel BerlinMOD . . . 75
4.3.1 Parallel Data Generation . . . 75
4.3.2 Parallel Range Queries . . . 77
4.3.3 Evaluations On Benchmark Queries . . . 80
5 Cloud Evaluation 83 5.1 Amazon EC2 Services . . . 84
5.1.1 Hardware Configuration . . . 84
5.1.2 Software Configuration . . . 86
5.1.3 EC2 Instance Performance . . . 87
5.2 Set up Parallel SECONDOon EC2 Clusters . . . 88
5.3 Evaluations In EC2 Clusters . . . 91
6 Optimization 95 6.1 Pipeline File Transfer . . . 96
6.2 FLOB Accessing in PSFS . . . 98
6.3 Distributed Filter and Refinement . . . 101
7 Conclusions 107
7.1 Summary . . . 107
7.2 Future Work . . . 108
Appendices 108 A Install ParallelSECONDO 109 A.1 Installation Instructions . . . 111
A.2 Deploy Parallel SECONDOWith Virtual Images . . . 114
A.2.1 VMWare Image . . . 115
A.2.2 Amazon Machine Image . . . 116
B Evaluation Queries 121 B.1 Join on Standard Data Types . . . 121
B.2 Join on Spatial Data Types . . . 123
B.3 Join on Spatio-Temporal Data Types . . . 126
C Parallel BerlinMOD Benchmark 129 C.1 Data Generation . . . 129
C.2 Parallel Range Queries . . . 131
List of Tables
3.1 Extended Operators for Basic PSFS Access . . . 31
3.2 Auxiliary Tools for Parallel SECONDO . . . 37
4.1 The Classification of Parallel SECONDODistributed Objects . . . 49
4.2 DELIVERABLE Data Types . . . 50
4.3 Parallel Operators . . . 53
4.4 Extensive SECONDOOperators for PBSM . . . 59
4.5 Operators Extended for Hadoop Distributed Join . . . 63
5.1 Auxiliary Tools for Parallel SECONDOon EC2 . . . 90
6.1 Extended Operators for Optimized PSFS Access . . . 96
A.1 Cluster Preparation Arguments . . . 120
List of Algorithms
1 Generic Hadoop-based Parallel Join . . . 28
2 SECONDODistributed Join (SDJ) with PBSM . . . 61
3 Hadoop Distributed Join (HDJ) with PBSM . . . 62
4 Pipeline File Feed (pffeed) . . . 98
List of Figures
1.1 Multi-dimensional Objects . . . 3
2.1 SECONDOComponents (left), Architecture of Kernel (right) . . . 19
2.2 The Graphical User Interface for SECONDOand Parallel SECONDO 21 3.1 The Infrastructure of Parallel Secondo . . . 24
3.2 Two Region Objects . . . 25
3.3 Data Structure in PSFS files . . . 32
3.4 The SQL Statement of 12th TPC-H Query . . . 35
3.5 Evaluations on 12th TPC-H Query . . . 36
3.6 The Main Frame with Error Information . . . 40
3.7 The Frame for Single-Computer Installation . . . 41
3.8 The Frame for Simple Cluster Installation . . . 42
3.9 Advanced Setting Frame . . . 43
4.1 Query in SQL-like Language . . . 47
4.2 Query in SECONDOExecutable Language . . . 47
4.3 PS-Matrix . . . 51
4.4 The Map/Reduce Procedures Described by Hadoop Operators . . 54
4.5 The Boundary Crossing Objects in PBSM . . . 57
4.6 The Common Smallest Cell . . . 58
4.7 The Partition Method in PBSM . . . 58
4.8 The Sliced Representation of Moving Point Values . . . 65
4.9 The SQL Query of the 6th BerlinMOD Query . . . 66
4.10 The Sequential Query for the 6th BerlinMOD Query . . . 67
4.11 The Parallel Query (SDJ) for the 6th BerlinMOD Query . . . 68
4.12 The Parallel Query (HDJ) for the 6th BerlinMOD Query . . . 69
4.13 Query Converting Process for PBSM . . . 71
4.14 HDJ and SDJ Performances On Spatial Data . . . 73
4.15 SDJ Performance On Spatial Data With In-memory Index . . . 73 ix
4.16 HDJ and SDJ Performances On Spatio-Temporal Data . . . 74
4.17 SDJ Performance On Spatio-Temporal Data With In-memory Index 74 4.18 The 1st BerlinMOD Example Query . . . 78
4.19 The 10th BerlinMOD Example Query . . . 79
4.20 Data Generation in Parallel BerlinMOD . . . 81
4.21 The 10th Example Query in Parallel BerlinMOD . . . 81
5.1 EC2 Storage Architecture . . . 85
5.2 The Amazon EC2 Web-based Console . . . 87
5.3 Evaluation On Spatio-temporal Data in Cloud . . . 92
5.4 Average Step Overhead for the Cloud Speed-up Evaluation . . . . 93
6.1 Shuffling Overhead on the Cluster . . . 97
6.2 The Tuple Storage in SECONDO . . . 100
6.3 SDJ with the Distributed Filter Refinement Mechanism . . . 103
6.4 Parallel Spatial Join on Lands and Buildings in SDJ-Index’ . . . . 105
Abstract
In the recent years, along with the popularization of portable positioning devices like smart phones and vehicle navigators, it is becoming simpler to generate and collect end-users’ continuous position information (termed moving objects data), in order to assist various location based services. Under this background, our group’s SECONDOsystem was developed. It is designed as an extensible database system, providing a large number of data types and algorithms to represent and efficiently process moving objects based on the constant geographical information (termed spatial data).
However, like many other standalone databases, SECONDOis facing challenges from Big Data, since it was developed as a single-computer system and its capa-bility is restricted by the underlying computer resources. There are many paral-lel processing platforms like Hadoop developed for analyzing massive data upon computer clusters. However, they usually lay more weight on improving their ef-ficiency and scalability but less on processing specialized data types. In order to scale up SECONDO’s capability to a cluster of computers, this Ph.D project in-tends to propose a hybrid system combing the Hadoop platform and SECONDO databases, taking the best technologies from both sides. This new system is named Parallel SECONDO.
In this dissertation, the following issues about this novel system are studied. (1) A hybrid structure is established to combine Hadoop and SECONDOfor achieving the most effective performance. Specifically, a native store mechanism is devel-oped to reduce the data migration overhead between them to the minimum. (2) A parallel data model is proposed to help end-users to state their queries in SECONDO executable language, getting rid of the low-level and rigid programming model in Hadoop. Besides, it enables Parallel SECONDOto inherit most existing SECONDO data types and operations, hence any heavy sequential query can be easily con-verted into the corresponding parallel statements. As an example, a join method named PBSM is extensively used in this thesis. It can process the join operation on both spatial and moving objects data. Besides, its various approaches are also proposed, using different distributed file systems to shuffle the intermediate results,
in order to achieve the best performance. All these approaches can be represented as SECONDOqueries with slight adjustments, fully demonstrating the parallel data model’s flexibility. (3) Parallel SECONDOis evaluated not only on our small private cluster, but also on large clusters consisting of hundreds virtual computers provided by AWS (Amazon Web Services). On these different scale environments, Parallel SECONDOkeeps a stable speed-up and scale-up performance, showing remarkable scalability by being set up on the Hadoop platform. (4) Regarding the special stor-age for spatial and moving objects data, a set of optimization technologies are also developed to improve the data access in the cluster environment.
Furthermore, we intend to develop Parallel SECONDOas a user friendly sys-tem. A set of auxiliary tools are developed to easily deploy and manage the system on large-scale clusters. Two virtual machine images are also provided, hence end-users can get familiar with the system immediately and use it to address their own problems. The graphical user interface in SECONDOis also inherited, hence the query results can be displayed with vivid animations.
Introduction
1.1
Background and Motivations
In the recent years, Big Data and how to process them efficiently has become an increasingly hot issue in both commercial and academic communities. Along with the popularity of various Internet services, massive amounts of customer informa-tion are continuously collected and stored, in order to find valuable knowledge and regularity from them. For example, until the end of 2012, each day there were 618 million users active on Facebook, creating more than 500 TB new data. Every 30 minutes 105 TB data are scanned over the company’s clusters where more than 100 PB data can be preserved in total.
Apparently, facing the challenges from such exploding data, the conventional RDBMS systems are bound to fall behind. They are usually installed on single computers with limited resources, which make it impossible to be used as the repositories for such massive data. Certainly their performances can be improved by being migrated to some more advanced computers, which usually charge a con-siderable price and will be out of date soon. Moreover, parallel databases like Vertica [43] are able to run on multiple computers also cannot hold these large data sets. On one hand, these systems usually have a high homogeneity demand for the underlying clusters, which is nearly impossible to achieve at scale. On the other hand, parallel databases are often designed with the assumption that fail-ures, caused by either software or hardware problems, happen rarely in the clusters. However, this probability rapidly increases if more computers are added into the cluster. Therefore, the scalability of these parallel databases is restricted, hence they usually cannot be set up on clusters with more than one hundred computers [1], while the exploding data often need the computing resources of hundreds or even thousands of computers.
All above issues promote the study for a more scalable parallel processing mechanism, in order to process the massive data on much larger clusters. Con-sequently, novel platforms like MapReduce [9], Dryad [36], SCOPE [7], etc. are proposed successively. They all intend to provide a flexible infrastructure over the network, storing data distributively without following the relational data model. In addition, they usually give more considerations to keep a high fault-tolerance instead of efficient performance on large clusters. Although these platforms are often criticized as brute-force approaches, causing many controversies by being compared with the parallel databases [43, 48], they become very attractive to both the industry and research communities because of their outstanding performances on the large-scale data analysis.
Among these novel approaches, MapReduce gains the most attention. It was proposed by Google in 2003 and has been used by the company itself for a decade. It helps to process their PB-size data distributed on thousands computers across multiple data centers with more than 10,000 distinct programs, including the algo-rithms for large-scale graph processing, text processing, machine learning, and sta-tistical machine translation [10]. In addition, its programming model (also named MapReduce) is relatively simple compared with others. End-users first implement a Map function to process ahkey, valuei pair to generate a set of intermediate hkey, valueipairs, then a Reduce function is specified to process allvalues asso-ciated with the same intermediatekey. Google’s MapReduce platform is kept pri-vate, but the programming model is implemented in the open source project called Hadoop, provided by the Apache Software Foundation. Therefore, the MapRe-duce paradigm and the Hadoop platform are widely studied in many researches about the parallel processing. Consequently, the work in this thesis is also built upon the Hadoop platform.
Besides the pressure from the exploding data, it is notable that the species of these data also become much more various nowadays. Along with of the popularity of the positioning devices integrated within smart-phones and navigators, it is com-mon that end-users can get their real-time locations (spatial data), by which they can easily find nearby interesting targets or the best path to their destinations. In the meantime, by collecting these locations along with the time sequence, end-users’ trajectories (moving objects data) can also be recorded. In this thesis, these spatial and moving objects data are collectively called multi-dimensional data since they distribute on spaces with more than one dimension. If authorized by end-users, these multi-dimensional data can also be collected as big data, in order to find their hidden regularity. For example, the airlines can conclude certain patterns by ana-lyzing the trajectories from their pilots [44], in order to improve the training system or direct their airports’ future constructions.
[18, 26, 33] to process, which are normally not provided in common database sys-tems. Moreover, their big sets usually have not only a large quantity but also a great size, since each multi-dimensional object contains plenty of details. For example, the region shown in Figure 1.1a has fifteen lines, while each line needs at least four real numbers to denote its two endpoints’ coordinates. Therefore, this object requires at least 60 real numbers for storing all its details. Similarly, Figure 1.1b depicts two moving objects with three segments. Each segment keeps not only the geographic coordinates of its endpoints, but also the timestamps of the interval. All these detailed information increase the size of multi-dimensional data, causing more network and disk overhead to transfer and preserve them in clusters.
a b c d e f g h i j k l m n o
(a) Spatial Data
x
y
t
(b) Moving Objects Data Figure 1.1: Multi-dimensional Objects
In order to process these multi-dimensional data, a “generic” database system named SECONDO[27] is proposed. It is being developed for over a decade, able to process many kinds of data models, including relational, spatial, spatio-temporal, etc. At the current stage, nearly one hundred data types and thousands of their related operators have already been implemented in SECONDO, creating a powerful tool for processing these specialized data types. More details about SECONDOare introduced in Section 2.4.
Nevertheless, SECONDOis implemented as a centralized system, hence it can only be deployed on a single computer and its capability is restricted by the under-lying hardware, facing the challenges from big data. Therefore, in this PhD project we intend to scale up the capability of SECONDOto a cluster of computers, in order to process specialized data models at large scale. The new system is named Parallel SECONDO.
For this purpose, we determine to combine SECONDOwith the Hadoop plat-form because of its good extensibility and popularity. In total, there exist the
fol-lowing three possible combinations of these two systems:
1. Integrate the MapReduce programming model into the database system, en-abling itself to schedule and assign the parallel tasks over the cluster [20]. However, it is difficult to learn all the details in the MapReduce program-ming model and the Hadoop implementation, hence it is too complicated to put this solution into practice.
2. Extend the Hadoop platform with all necessary SECONDO data types and operators, like what SpatialHadoop does in [17]. For us, this solution is not cost-effective since it needs reimplementation of almost all SECONDO func-tions based on the MapReduce programming model. Besides, whenever a new data type or operator is extended for SECONDO, it also should be im-plemented repeatedly in Parallel SECONDO, causing a significant workload. 3. Build Parallel SECONDOas a hybrid system, using one Hadoop platform to couple with a set of SECONDOdatabases that are distributed on the cluster computers. Here Hadoop is used only as the communication level, assigning MapReduce tasks to the cluster computers and keeping the workload balance on the cluster. In the meantime, the single-computer query work is embedded inside the parallel tasks and pushed into the SECONDOdatabases to process, in order to achieve the best performance.
Apparently, the third solution maintains the independence of both components and also uses their best technologies. On one hand, it uses Hadoop to achieve the best scalability, hence can be deployed on large-scale clusters. On the other hand, it inherits SECONDO’s capability to process the specialized data types efficiently. Therefore, in this thesis, Parallel SECONDOis constructed in this way. In the fol-lowing, we will roughly illustrate the main challenges that we meet by coupling these two systems and present our basic solutions.
1.2
Thesis Contributions
This thesis mainly covers the following four issues during the building process of Parallel SECONDO: (1) system infrastructure, (2) declarative query language, (3) cloud evaluation and (4) optimization technologies. We demonstrate the coupling mechanism in the first topic, in order to achieve the most scalable and efficient combination of Hadoop and SECONDO. Then a parallel data model is prepared, which enables end-users to state parallel queries in a well understandable declara-tive language. In addition, Parallel SECONDOis also fully evaluated on large scale
clusters consisting of more than one hundred virtual computers. During this pro-cess, problems on managing the multi-dimensional data at large scale are revealed, and their solutions are then discussed in the next topic. Details about all above issues are elaborated in the following subsections.
1.2.1 System Infrastructure
Essentially, Hadoop is implemented as a data-driven platform. It stores the massive hkey, valueipair data in its HDFS (Hadoop Distributed File System) and then allo-cates them to the parallel tasks to process, according to the MapReduce paradigm. Consequently, most Hadoop-based hybrid systems like HadoopDB [1] or Hadoop GIS [2] keep using HDFS as the communication level for both data and tasks. In these systems, database records are transformed and shuffled via HDFS, and then loaded into the single-computer databases when they are required. Nevertheless, since the multi-dimensional data are much larger than the standard data, this addi-tional and frequent data migration always causes considerable overhead.
Regarding this issue, Parallel SECONDO adopts the idea of so called native store. It keeps the data either in the distributed databases, or in a simple self-made distributed file system named PSFS (Parallel SECONDOFile System), without fol-lowing the format for the MapReduce model. At the same time, HDFS stores only a few light-weighted synopsis data in order to schedule the MapReduce tasks by Hadoop. Therefore, Parallel SECONDOuses HDFS only as the task communication level, in order to avoid the useless data migration overhead as much as possible.
Based on PSFS, Parallel SECONDO setsDS (Data Server) as the basic pro-cessing unit of the system. EveryDS contains a SECONDOdatabase and a PSFS node, while each MapReduce task is assigned to oneDS to process. During the procedure, the required data are first collected from the remote PSFS nodes into the local one, then loaded into the SECONDOdatabase directly, without any unnec-essary transformation cost.
Furthermore, a series of operators are proposed in Parallel SECONDOfor ac-cessing data in PSFS. They can either export SECONDOrelations into several PSFS nodes as disk files, or read files from severalDSs and then import the data back into SECONDOdatabase as a relation. Considering the high probability of encounter-ing failures in large-scale clusters, files in PSFS can also be duplicated on several adjacentDSs, in order to avoid the data loss when few PSFS nodes become inac-cessible.
Nowadays, it is common that one low-end PC is also equipped with a multi-core processor, large-sized memory and multiple hard drives. This feature is not considered in Hadoop, which views the computers as the basic processing unit. Therefore, multiple tasks assigned on the same computer are forced to access the
same disk drive, decreasing the system performance because of their disk inter-ruption. In contrast, Parallel SECONDO is able to set several DSs on different disks within the same computer. Therefore, each computer can further distribute its allocated tasks to its multipleDSs, in order to reduce the unnecessary disk I/O overhead. Although Hadoop itself can also relieve this problem by using more computers, Parallel SECONDO is much more economical as it can fully use the existing resources of the current cluster.
In the end, in order to assist end-users to use Parallel SECONDO, a set of aux-iliary tools are also proposed, which are able to install, start, stop and remove the system on different scale clusters quickly. Besides, both the SECONDOtext and graphic interfaces are fully compatible with the new system, by which end-users can monitor the results intuitively. At last, once a new function is extended in SEC -ONDO, it can be immediately utilized in Parallel SECONDOby distributing the new SECONDOsystem to allDSs.
1.2.2 Declarative Query Language
Like many other parallel platforms, Hadoop provides no declarative language. It forces end-users to write their algorithms in a low-level language likeC++or Java based on the MapReduce programming model, which are difficult to be maintained and reused. In addition, MapReduce asks no schema request for the stored records, hence end-users can structure their data in any manner. Although this mechanism improves the system’s flexibility, it creates barriers for coworkers, since they must reach an agreement on the data format and parse the data explicitly in their respec-tive algorithms.
In contrast, SECONDO provides two levels of query languages: executable and SQL. In the first level, queries are stated with database objects and opera-tors, representing the query plans precisely, hence can be processed by the SEC -ONDO database directly. Furthermore, end-users can also state their queries in SQL, which are then converted and optimized into the query plans declared in the executable language.
In order to help end-users to construct their queries easily, a parallel data model is proposed; hence the queries can be formulated in the first-level SECONDOquery language. It includes a new data type namedflist to indicate the schema and the distribution status of the parallel objects, in order to separate the schema from var-ious queries. Besides, it also prepares several operators to describe MapReduce jobs. The single-computer task query is encapsulated within these operations as UDF (User Defined Function), hence most existing SECONDOdata types and op-erators can be used in Parallel SECONDOas usual.
types, including: standard, spatial and moving objects. These join queries are then used to evaluate the performance of Parallel SECONDO, due to the factor that the join is widely thought of as the most costly procedure in the Hadoop platform and deeply studied in many other researches. Considering the speciality of the multi-dimensional data, the PBSM (Partition Based Spatial Merge) method [42] is often used to process the join on them, and several ad-hoc operators are also proposed in Parallel SECONDOfor this method.
At last, we convert all sequential queries in the benchmark BerlinMOD[16] into their corresponding parallel queries, which covers all possible range queries on the moving objects data. It fully demonstrates the compatibility and the flexibility of this parallel data model.
1.2.3 Cloud Evaluation
MapReduce and Hadoop gain a lot of attention from the database community mainly by taking advantage of their outstanding scalability. For example, Google’s native MapReduce platform can be evaluated on a cluster consisting of 1800 com-puters [9]. So far, the largest Hadoop cluster that we know is built byYahoo![47], containing 3500 machines.
Consequently, as a Hadoop extension, Parallel SECONDOshould also be eval-uated on large-scale clusters. However, purchasing so many computers all by our-selves is obviously not economic. Therefore, we determine to rent them for a rea-sonable price from AWS (Amazon Web Service), where the computing resources can be leased as virtual computers. Moreover, we are grateful to have received a considerable grant from AWS in Education, which enables us to evaluate the system on hundreds of virtual computers with limited investment.
In the end, we are able to deploy Parallel SECONDO on clusters containing at most 150 computers. Its performance is also fully evaluated with the parallel join for moving objects data, showing that Parallel SECONDO can still achieve satisfactory performance at this scale.
Apart from making evaluations on AWS, we also build up a public AMI (Ama-zon Machine Image) which contains the system’s basic components. Thereby, large clusters with deployed Parallel SECONDOsystems can be built up in few minutes, in order to save more time for end-users to use these resources for their own pur-poses.
Nevertheless, the evaluation on AWS does not continue with more than 150 computers, due to some new problems found on managing multi-dimensional data on large scales. For example, data in PSFS are shuffled via the standard file transfer protocol, the transfer cost increases rapidly along with the growth of the cluster size since more computers need to be accessed. Besides, the MapReduce join query
usually shuffles the complete data sets over the network, although many of these objects’ detail data are never used. The high cost on transferring a large amount of useless data opposes the performance of Parallel SECONDO on large clusters. Therefore, optimizing mechanisms are studied regarding these issues.
1.2.4 Optimization Technologies
Some optimizing technologies are studied and a set of operators are proposed in this topic, in order to further improve the performance of Parallel SECONDOon large clusters, regarding the above mentioned problems.
Normally, files in PSFS are transferred and loaded into SECONDO databases one after another, each file requires a certain overhead to prepare the connection between the computers. However, during the parallel procedures, it often happens that each task needs to access a set of files and considerable network resources are wasted during the sequential transferring. Addressing this issue, a pipeline mechanism is built up in PSFS to deliver the files concurrently, in order to reduce the overall elapsed time and achieve the full use of the network resource.
During the procedure of loading multi-dimensional objects from PSFS into SECONDO databases, each object’s complete information is read and cached in the memory buffer, which has a relatively small size limitation. Along with the increase of the input data, it becomes impossible to cache all these data within the memory hence part of them have to be flushed into the disk, causing additional disk I/O overhead. However, not all these detailed information are actually needed during the query procedures. Therefore, a novel approach is proposed to read these detailed information only when they are really needed.
Generally, the join procedure on multi-dimensional objects contains two stages: filter and refinement [38]. In the first stage, candidates are generated based on ob-jects’ approximate information, like the MBR (Minimum Bounding Rectangle) of regions with complicated shapes. This stage is usually used to eliminate the tuple pairs that cannot be part of the final result. For example, in the query finding the intersected regions, if two regions’ MBRs are disjoint, then their tuple pair is of course removed from the candidates. In the second stage, each candidate is exam-ined with the objects’ detailed information to further check whether they satisfy the join predicate.
Apparently, not all multi-dimensional objects’ large detailed data are needed in the refinement step. However, based on the MapReduce paradigm, both above two stages are normally processed in the Reduce stage, hence all objects’ complete data have to be shuffled over the network. Regarding this issue, one more optimization mechanism is prepared. It first separates multi-dimensional objects’ information into two parts, one for the approximate data and another for the detailed data.
Dur-ing the shuffle stage, only the approximate data are transferred over the network, in order to process the first join step and generate the candidate results. Afterward, only the candidate objects’ detailed information are collected in the second-time shuffling. With this mechanism, we can further reduce the network traffic, making Parallel SECONDOmore efficient on the large scale clusters.
1.3
Thesis Organization
The rest of this thesis is organized as follows: first the related work is reviewed in Chapter 2. Then Chapter 3 presents the infrastructure of the system, especially the PSFS for achieving the native store mechanism. A simple comparison is evaluated here to demonstrate the benefit that we earn by using PSFS instead of HDFS to shuffle the intermediate data. Next the parallel data model is introduced in Chapter 4, and the examples of using Parallel SECONDOto process the join on spatial and moving objects data are also explained and evaluated. Then Chapter 5 shows the evaluation on large-scale clusters upon the AWS platform. The optimizing mech-anisms are then introduced in Chapter 6, in order to further improve the system’s performance at large scale while processing multi-dimensional data procedures. Finally, a short conclusion about this project is given in Chapter 7 to summarize our contributions and display possible future directions.
Related Work
2.1
Early Parallel Database Systems
Parallel processing is not a novel topic in the database community. As early as the late 1970’s, the “I/O bottleneck” issue caused by the bandwidth differences among processors, memory and disk drivers, have already encouraged the study on parallel processing technologies.
In the early stage, database machines are proposed in projects like GRACE [21] and GAMMA [12], dispersing the I/O overhead on multiple disks by con-structing special-purpose hardwares, e.g, introducing data filtering devices within the disk heads. These high-end machines are often equipped with large-size mem-ory, multiple processors and disk drivers, hence the source data is partitioned into pieces and stored on those disks separately. In the relational data model, queries are processed in stream: the output of one operator is streamed into the input of another operator. Within parallel database machines, two operators can work in series givingpipelined parallelism, and each is processed by several independent processors simultaneously. Although these custom-made machines achieve a high performance, they did not succeed because of their poor cost/performance ratio, which make it difficult to further study or commercially develop them. Naturally, the software solutions like parallel database systems are accepted by more and more researchers [11].
A parallel database is set up on a cluster, containing a number of computers composed with standard hardware elements, i.e, processors, memory and disks. The computers are connected together inside the cluster through an interconnection network. Note that here the term processor is used only in the general sense of central processing unit (CPU). It is common that the processor itself can be made with several core processors, like a multi-core processor, performing
level multithread parallelism. Nonetheless, in this thesis, the parallelism usually is discussed in a higher-level, hence the processors are simply considered as black-boxes accessing other resources.
Within the cluster, depending on how much the resources, especially the mem-ory and disks, are shared during the database procedures, three basic architectures are obtained by most parallel database systems [52, 11]:
shared-memory: All processors share direct access to a common global memory
and all disks. Such a system can achieve a high performance easier since all processors are able to communicate via the main memory. However, in order to match up with the high throughput of the memory, it has an extremely high requirement for the underlying network, like a high-speed bus or a cross-bar switch, thus limiting the system’s scalability to a few tens of processors.
shared-disks: Each processor has a private memory but has direct access to all
disks. Compared with the shared-memory systems, it has a better scalibility since the memory is distributed. Besides, it does not require special data placement technology as the data on all disks need not to be reorganized. However, this architecture should keep the cache coherency on all involving computers for the sake of data consistency, requiring some form of difficult and complex distributed lock management functions.
shared-nothing: Here the resources are fully distributed: each computer is viewed
as a local site, charging only its own resources, and processing the assigned data independently. This architecture minimizes the interferences caused by resource sharing and throws off problems like locking overhead and cache coherency, achieving a considerable scalability. Despite few studies that claim shared-memory is still recommendable in limited degrees of paral-lelism [24] because of its high efficiency, the shared-nothing architecture is widely accepted by most researchers and adopted in projects like Volcano [22, 23], Vertica [43]. Novel parallel processing platforms like MapReduce [9] and Hadoop are also established based on this architecture.
2.1.1 Parallel Query Processing
Queries in the parallel databases usually are represented in high-level declarative languages like SQL, then they are transformed into execution plans that can be efficiently processed in parallel [40]. The generated execution plan normally is presented as a tree consisting of relational operators. Each takes the input (either the source or the intermediate data) and generates the intermediate data. The fi-nal query result is produced by the root operator. These operators are processed
with two basic forms: inter-operator and intra-operator parallelism. The former processes different operators on several computers in the way of pipeline, getting beneficial on complex queries containing many operators. The later processes the same operator on several computers, each working on a different partition of the data. It is better on processing heavy operations, like sequentially scanning a large amount of tuples.
During the executing processes, most parallel databases transfer the interme-diate data between two operations with the approach of the push model, i.e. the producer operator sends its output data to the successive consumer operator over the interconnection network directly. It is opposed to the pull model, where the pro-ducer operator materializes the output data on the local disks as split files then the consumer operator seeks its input from the producer operator by copying the files. The pull model has a better performance on the fault-tolerance property. When the producer operator is carried out as parallel tasks on several computers and one of them fails regardless of hardware or software problems, the push model needs to re-execute all these tasks while the pull model only needs to process that failed one again. Nevertheless, the push model is more efficient since the intermediate data needs not to be materialized as many small files, which cost considerable overhead to create and transfer, hence it is often adopted in the parallel databases [10, 43].
2.1.2 Parallelism Metrics
Parallel database systems are evaluated with two main properties: speed-up and scale-up [11]. The speed-up measures the improvement by using a larger parallel system to process a fixed job:
Speed-up = small system elapsed time
large system elapsed time
The speed-up keeps linear in the ideal system, i.e. an N-times large system should yield a speed-up ofN. In the meantime, the scale-up measures the system when both the cluster and the job scale grow. The ideal scale-up is linear if its value is always1.
Scale-up = small system elapsed time on small problem
large system elapsed time on large problem
In addition, instead of using the elapsed time to evaluate systems’ speed-up, this thesis introduces a new metric called parallel improvement (PI). It measures the ratio of time for sequential query over time for processing it in a parallel way [25]:
PI = sequential system elapsed time parallel system elapsed time
Here the sequential system stands for a database system that can only run on a single computer. With this new metric, it is more intuitive to tell the improvement brought by the parallel system.
2.2
MapReduce
Although parallel databases perform outstandingly on processing relatively large-scale data, their scalability is often questioned [43, 10]. First, they have a strong requirement in the homogeneity of the cluster, which is difficult to achieve on large scales. Second, parallel databases are very sensitive to failures during the running time as the intermediate results are transferred by the push model. Any failure on few computers can cause the re-execution of the complete query. Third, data have to be loaded before being processed. The loading cost is quite considerable on a large amount of data, especially when the data is used only for a small number of times. All these issues restrict the further development on parallel databases.
To the best of our knowledge, there is no conventional parallel database system that can be deployed on clusters consisting of more than one hundred comput-ers [1]. Consequently, it blocks those emerging Internet companies like Google and Yahoo from processing their massive amounts of data with parallel databases. These data include the web pages captured by the crawler, searching and trading log files, etc. They increase daily by petabytes, but only being temporarily stored and analyzed with the complicated algorithms that are difficult to represent with SQL queries. Nevertheless, in order to respond to end-users’ requests as quickly as possible, they need to be processed with the computing capability of hundreds or even thousands of computers. This kind of requirements are impossible to achieve by parallel databases. Therefore, novel parallel platforms like MapReduce [9], Dryad [36] and System S [6] are proposed.
Among these platforms, MapReduce attracts most attention from both aca-demic and industry institutions. On one hand, it is credited to the MapReduce paradigm’s simplicity and flexibility, which can represent various parallel proce-dures with only two simple primitives: Map and Reduce. On the other hand, al-though Google did not publish their internal MapReduce platform, its open source implementation Hadoop1, proposed by ApacheTM, is available to the public. There-fore, the work in this thesis is mainly built upon the Hadoop framework.
2.2.1 The MapReduce Paradigm
The MapReduce paradigm itself is a kind of programming model, while the MapRe-duce platform like Hadoop is a framework that supports this model. It is inspired by the functional language Lisp, enabling end-users to express all kinds of parallel procedures with Map and Reduce functions, without considering the messy details of the parallelism like fault-tolerance, data distribution, load balancing, etc, which are processed by the underlying platform automatically.
Data in MapReduce is stored in the distributed file system, like the GFS (Google File System) or the HDFS (Hadoop Distributed File System). The storage layer is basically independent from the processing system on top, by simply keeping data as hkey, valueipairs in the row layout. However, MapReduce does not require data adhere to a schema defined using the relational data model. For the same record, its keyandvalue parts are set differently in various Map and Reduce functions, since end-users are free to structure their data in any manner. Accordingly, there is no reorganization needed for loading the data into the distributed file system. For example in HDFS, data are loaded by simply dividing the data files into fixed-size blocks and copying each into several HDFS instances, i.e. computers on which the HDFS platform is deployed. Therefore, the loading overhead in MapReduce is much cheaper than in parallel databases.
Conceptually, arbitrary parallel procedures can be easily decomposed into sev-eral MapReduce procedures, each containing one Map and one Reduce function with the following types:
Map: hk1, v1i → listhk2, v2i
Reduce: hk2, list(v2)i → listhk3, v3i
Both the Map and Reduce functions are carried out as intra-operators, being processed as parallel tasks running on all cluster computers. Each task processes either the Map or the Reduce function with partial input. The Reduce tasks start only when all Map tasks finish, hence we can divide the MapReduce procedure into two disjoint stages.
A Map task processes each inputhk1, v1ipair and generates the intermediate result hk2, v2i. Afterward, each Reduce task collects all pairs with the samekey value k2 as the input, then produces the result hk3, v3i that can be used in the successive MapReduce procedure.
In the end of the Map stage, all the intermediate resultshk2, v2ipairs are ma-terialized as split files and dispersedly stored on the computers where the tasks are carried out. Then each reduce task uses a file-transfer protocol to “pull” its required input files from the whole cluster [43]. This period is called the Shuffle stage, since all the intermediate data are moved between the Map and Reduce tasks over the
net-work. In order to avoid creating and moving a lot of small files during this stage, special implementation tricks like batching, sorting, and grouping of intermediate data and smart scheduling of reads can be used [10]. However, end-users do not need to care about these details since they are all hidden inside the Hadoop frame-work. By using the pull model to transfer the intermediate data, when one Map task fails for whatever reason, only the failed one needs to be re-executed. This mechanism improves the fault-tolerance property, making it possible to deploy the system on large-scale clusters.
To sum up, the MapReduce paradigm enables end-users to express a complex and heavy computation into several MapReduce procedures, in order to speedup the efficiency by processing it on a large-scale cluster. Details about the underly-ing parallelism mechanism are abstracted by the platform with only two primitive functions, simplifying the follow-up developments. Furthermore, using the pull model during the Shuffle stage to transfer the intermediate data vastly improves the system’s scalability, compared with the conventional parallel databases.
2.2.2 Pros and Cons
MapReduce performs outstandingly on its scalability. In 2009, Hadoop won the 1st position in the GraySort benchmark test on processing the sorting procedure with 100TB (1 trillion 100-byte records), with over 3800 computers [39]. Nevertheless, it is still criticized as a “major step backwards” in comparison with conventional parallel databases [8, 43, 3] and viewed as a brute force solution that wastes vast amounts of energy.
In [43], a comparison is performed between Hadoop and two parallel databases: Vertica1 and DBMS-X (a parallel SQL DBMS from a major relational database vendor), with the benchmarks including the Grep which was used in the original MapReduce paper [9]. Three typical relational operations: select, aggregate and join, as well as an UDF aggregation query are evaluated with these systems on the same cluster containing 100 computers. In the end, it reveals that Hadoop is 2-50 times slower than the two parallel databases, except the data loading.
These studies clearly exhibit that it is crucial to achieve a good tradeoff be-tween the efficiency and scalability in the parallel systems. On one hand, parallel databases often use the push model to pipeline the intermediate data between the query operators, in order to achieve a better performance. However, it also intro-duces the potential danger that many operations need to be re-executed when a slight failure happens. On the other hand, besides keeping the scalability, Hadoop should also introduce more mature database technologies like various index
tures, novel storage mechanisms [49, 14], sophisticated parallel algorithms, etc, to optimize its efficiency.
Apart from the efficiency comparison, MapReduce is also often criticized for lacking the support on the high-level declarative languages like SQL. All end-users need to represent their procedures by programming in procedural languages, such asC/C++or Java, based on the MapReduce paradigm. These programs process the distributed data on the record-level, creating custom parsers to derive the appropri-ate semantics from the input data, since they are stored with no relational model. In a word, the MapReduce paradigm is low-level and rigid, creating barriers for end-users to maintain and reuse those existing work with each other.
2.3
Hadoop Extensions
Regarding these imperfections about MapReduce, many extensions are proposed in the recent years. Since most of them are actually built upon the Hadoop platform, which is widely perceived as a substitute for MapReduce, they can also be called Hadoop extensions. Roughly, these extensions can be divided into the following four kinds.
The first kind of extensions attempts to improve the data accessing in the Hadoop platform. Natively, Hadoop processes the data on the record-level, with-out using any kind of index structure, hence its efficiency on many conventional database operations is much slower than the parallel databases [43]. Addressing this issue, Hadoop++ [14] proposes the approach that injects a Trojan index at the end of each data block in HDFS, so as to reduce the I/O cost without changing the MapReduce paradigm. However, pre-generating the index structure is time-consuming and also it is difficult to build the index for the data set on all perspec-tives, hence it is challenging to use it as a generic system. In addition, the Map and Reduce stages are blocked from each other, no reduce task can start before the last Map task finishes. This strategy guarantees the system’s fault-tolerance property, but pulls down the processing performance as well [4, 31, 34]. There-fore, some studies intend to introduce the push model into the shuffle stage in a proper manner. In [34], as soon as each Map task finishes, the intermediate result are hashed and pushed to the hash tables held by the Reduce tasks. Therefore, the Reduce tasks can start to perform the aggregation within each bucket on the fly even when the Map tasks are not total completed. Nevertheless, whether this kind of approaches interferes with the system’s scalability is still doubtful.
Secondly, the importance of using the high-level declarative languages espe-cially like SQL to describe, instead of using the low-level procedural languages to program the MapReduce procedures has gradually become the consensus of the
whole database community. Therefore, many extensions are proposed to improve the expressivity for describing Hadoop jobs. In the early stage, the project Pig [37] develop a language named Pig Latin. It provides a nested data model and corre-sponding operators, which can be used to depict the MapReduce operations with SQL-like style, in a so called Pig Latin program. This program can be transformed into Hadoop jobs automatically with a corresponding compiler in the Pig system. Besides, the Hive [50] project, which is practically used in Facebook, also pro-posed a language named HiveQL. A HiveQL statement is similar to a SQL query and is also converted into several jobs that are carried out on the Hadoop platform. In particular, Hive views HDFS as a data warehouse and offers a meta-store to keep the schemas of all involved data, hence it is able to optimize the generated Hadoop jobs based on this information. Nevertheless, both the compilers in Pig and Hive are na¨ıve. For example, Hive assumes all the tables in the system are independently distributed on the cluster, hence queries like join that involve multiple tables always push most procedures into the Reduce stage and cause additional shuffle overhead. More recently, the distributed database F1 [46] proposed by Google itself also in-cludes a fully functional SQL engine. Although its implementation details are not deeply introduced in the existing paper, it clearly exhibit the importance of a high-level declarative language in the future parallel systems.
The third kind of Hadoop extensions enhances Hadoop’s efficiency on partic-ular database operations, especially the join operation which shows the biggest performance difference between Hadoop and the parallel database systems in [43]. [4] studies several Hadoop-based algorithms and preprocessing methods to process the parallel equi-join operation on vast amounts of log records. On the one hand, if there occurs a large size difference between two data sets, the smaller one is ei-ther delivered to the Reduce tasks before the oei-ther one in order to be fully buffered into the memory, or broadcasted among the cluster without repartitioning the large table. On the other hand, two large data sets can be processed with the semi-join method. One set is first filtered by the aggregated semi-joining attributes from the other set, hence the communication overhead can be reduced as much as possible. Besides these optimized parallel algorithms, other researchers attempt to propose the MapReduce variances with more flexible data flow. Map-Reduce-Merge [57] improves Hadoop on processing heterogeneous operations like the one-round join query. It adds an extra primitive function Merge to the MapReduce paradigm, in order to avoid the homogenization on both inputs. Thereby, two heterogeneous re-lations can be processed independently and then be joined in the last Merge stage. Most above Hadoop extensions tend to import the existing database technolo-gies, which are been developed for decades and widely supported by many DBMSs, into the Hadoop platform. Nevertheless, they need to reimplement these methods into the Hadoop platform, causing some potential side affects. Therefore, hybrid
systems are proposed as the last kind of Hadoop extensions, in order to take the best features from both sides. They are typified by HadoopDB [1], which combines the Hadoop framework with single-node database PostgreSQL. It depends on Hive to compile the SQL-style queries into Hadoop jobs, while each task pushes the assigned sub-queries into slaves’ high-performing database to process those rela-tional database operations. The Hadoop platform is then used as the task coordina-tor and the communication level. There are also some other Hadoop-based hybrid systems. [55] shows the integration of Hadoop with parallel DBMS Teradata EDW. It builds up a data tunnel between these two systems, in order to use either system to process different kind of queries. DEDUCE [32] combines Hadoop and System S, since the latter has an advantage on processing the stream data. Therefore, it embeds the Hadoop workflow into its SPADE data flow, in order to use the anal-ysis results, which are computed by Hadoop based on a large amount of history data, to assistant the real-time processing. At last, SQL/MapReduce [20] achieves the hybrid on the other way round by introducing the MapReduce paradigm to the parallel databases. In their own parallel database nCluster, MapReduce procedures are declared and processed as UDFs in SQL queries.
2.4
S
ECONDODatabase System
GUI
Optimizer
SECONDO Kernel
Command Manager
Storage Manager & Tools Query Processor & Catalog
Algn
Alg2
Alg1 ...
Figure 2.1: SECONDOComponents (left), Architecture of Kernel (right) We intend to build up Parallel SECONDOas a hybrid system as well, by com-bining the Hadoop framework with a set of SECONDOdatabases. SECONDO[27, 28] is a “generic” single-computer database system, aiming to be filled with the implementations of various DBMS data models, like relational, spatial, temporal, etc. It represents different data types like spatial and moving objects and processes them with efficient algorithms [26, 19, 33].
Roughly, SECONDOincludes three level modules: kernel, optimizer and user interface, shown in Figure 2.1. The user interface provides the front-end to end-users with both text and graphical interfaces. The former is a command-line shell where both the input query and the result are represented as text descriptions. On contrary, the latter can represent different types of query result in a graphical way. E.g, a moving object’s trajectory can be illustrated on the urban road network of the real-world, while its movement is also animated vividly, as shown in Figure 2.2.
SECONDOaccepts queries in two level languages: executable and SQL. In the latter case, those SQL-like queries are first processed by the optimizer to be trans-formed into the optimal query plans, which are expressed in SECONDOexecutable language. The optimization procedure of the query plan is determined based on the cost estimates of the involved predicates (operators), with small sample relations. However, its details cannot be further discussed in this thesis since they are far beyond the scope of our main topic.
At last, the SECONDO kernel module provides the command manager, query processor and storage manager, to represent and process different data types. Queries expressed in the executable language are first parsed into the operator trees by the command manager, while each node is a SECONDO operator. The execution of the operator trees is controlled by the query processor. The leaf operators access the source data from the storage manager, while the root operator generates the final query result. Each internal operator gets the input from the former one and processes them iteratively, then generates the output to the successive operator. In addition, the kernel’s capability is extended by algebra modules. An algebra mod-ule generally offers a set of type constructors to represent a kind of data objects [13], and also a set of operators to process them with efficient algorithms [33].
A Client/Server mechanism is also provided in SECONDO. It uses a server dae-mon namedMonitorto listen and process all requests from remote clients, based on the local database. It is possible to set several SECONDOMonitors on the same computer, while eachMonitoris combined with a different database and listens to a unique TCP/IP port.
Parallel SECONDOkeeps the SECONDOuser interface untouched, using it as the front-end of the new system as well. Therefore, end-users are able to access both of them with the same interface, in order to solve different scale problems by selecting either system. Besides, Parallel SECONDOuses Hadoop to connect a set of SECONDOkernel modules. The Hadoop platform is only used as the commu-nication level and the task coordinator, just like what HadoopDB does, while the data is processed all within the SECONDOsystems, in order to achieve the best per-formance. In the meantime, all functions like invoking the Hadoop platform and exchanging data between SECONDOdatabases over the network are implemented
Figure 2.2: The Graphical User Interface for SECONDOand Parallel SECONDO
as two SECONDOalgebras. More details about Parallel SECONDOinfrastructure are introduced in Chapter 3.
2.5
Parallel Processing on Specialized Data
SECONDOis mainly proposed to process specialized data types that normally are not well supported in ordinary database systems, especially for the moving ob-jects data. Likewise, Parallel SECONDOneeds to inherit this capability, in order to handle these special types of data on a large scale.
To the best of our knowledge, there do not exist many parallel systems that can systemically process these special data types. SpatialHadoop [17] proposes such a system by extending the Hadoop platform with spatial data types and functions. It uses a two-level index structure to store a large amount of spatial objects over the cluster, in order to efficiently perform spatial operations like range queries, kNN queries and spatial join. However, it forces end-users to extend the system with new data types and operations by programming in MapReduce paradigm, building
barriers for future developments. HadoopGIS [2] also attempts to combine Hadoop with their spatial processing engine RESQUE. It is integrated with Hive, hence queries can be expressed in SQL-like statements. Nevertheless, it implements the combination simply on the record-level since data should be parsed in real-time, causing considerable overhead along with the increase of the data size. Besides, neither of them support any technology for the moving objects data.
There are some studies proposed for processing certain spatial queries in paral-lel. In [42], the PBSM method is proposed to process the spatial join operation in parallel. It evenly divides spatial objects into disjoint partitions, hence each cluster node can process the join within one partition independently. SJMR [58] imple-ments this method with the MapReduce paradigm and improves it by removing duplicated result in-stream. This method is also adopted in our study for process-ing the spatial and spatio-temporal join in Parallel SECONDO. BRACE [54] uses MapReduce to simulate agents’ behaviors in parallel. It abstracts behavioral simu-lations in the state-effect programming pattern, which can be processed as iterated parallel spatial join operations, with two consecutive MapReduce jobs at each time. At last, some studies are also proposed to process special data types with other parallel platforms instead of MapReduce or Hadoop. [6] specifically studies the grid-based map-matching procedure in parallel, which can be viewed as an appli-cation of the spatial join operation. It is implemented with the IBM’s System S system. Paradise [41] is a parallel geo-spatial DBMS based on a hierarchical in-frastructure including both shared-memory and shared-nothing architectures. It is particularly prepared for dealing with a large number of satellite image data, by declustering them across the cluster based on their spatial attributes. Besides using the push model, it particularly imported the pull model to fetch large image data over the network only when they are required.
System Infrastructure
This chapter mainly explains the construction of Parallel SECONDO. We start in Section 3.1 by introducing all system components prepared for coupling the Hadoop framework with our extensive data processing engine SECONDO. Unlike many other Hadoopize systems that rely on the default HDFS (Hadoop distributed File System) to shuffle intermediate data for inheriting the Hadoop framework’s essential features: the balanced workload assignment and the large scalability, Par-allel SECONDOprovides a similar and much simpler module called PSFS (Parallel SECONDOFile System). Via this, all intermediate data can be exchanged among distributed SECONDOdatabases directly, in order to achieve the best network trans-fer performance. The detailed explanations about PSFS are mainly introduced in Section 3.2, where some evaluations are also given to compare the performances between PSFS and HDFS. At last, Section 3.3 introduces the auxiliary tools that are provided to help end-users easily deploying and managing Parallel SECONDO on large-scale clusters.
To get the most understanding of this chapter, we recommend the reader to download and make a trial of Parallel SECONDObefore reading the coming details. Parallel SECONDOhas already been freely published along with SECONDO3.3.2 and can be downloaded from our website 1. The reader can download its source code and easily set it up on either a single-computer or a private cluster by the installation script. More specific deployment steps can be found in Appendix A.
Besides, we also provide the virtual machine images for Parallel SECONDO hence end-users can get familiar with the system and use it to process their prac-tical data as quickly as possible. Firstly the VMWare image contains a Parallel SECONDO system that is installed and configured on a single-computer Ubuntu system, with which Parallel SECONDOcan be immediately set up by loading it into
1http://dna.fernuni-hagen.de/secondo/ParallelSecondo
Master Node
P S F S
master Data Server
Mini Secondo Master Database DS Catalog Hadoop Operators H D F S Slave Node Main Server Mini Secondo Slave Data Server Slave Node . . . Parallel Query(UDF) UDF JobTracker NameNode TaskTracker DataNode UDF DS Catalog Slave Database Slave Data Server
Figure 3.1: The Infrastructure of Parallel Secondo
a VMWare virtual computer. Secondly an AMI (Amazon Machine Image) with Parallel SECONDOis also provided. Based on this image, it is possible to create a large-scale cluster consisting of Amazon EC2 (Elastic Compute Cloud) instances, where Parallel SECONDOhas already been automatically deployed. Both images are also freely published on our website and their specific usages are introduced with more details in Appendix A.2.
3.1
System Components
Essentially, Parallel SECONDOis built up as a hybrid system by efficiently coupling the Hadoop framework with a set of SECONDOdatabases. The infrastructure is shown in Figure 3.1. This system is inspired by HadoopDB [1], where the Hadoop framework mainly takes charge of assigning and scheduling parallel tasks running on an amount of computers, while the tasks’ embedded procedures are then pushed as much as possible to the distributed local database engines, in order to improve the system’s overall efficiency. Beyond this, there are several special modules and mechanisms in Parallel SECONDOfor achieving a better performance.
First, SECONDOprovides a lot of techniques for processing spatial and mov-ing objects (spatio-temporal) data which both need heavy geometric computations. Unlike common database procedures, geometric computations are CPU- and I/O-intensive. For example, as shown in Figure 3.2, tworegionobjectsAandBdo not intersect although their MBRs (Minimum Bounding Rectangle) overlap. There-fore, the intersection detection needs the comparison between not only their MBRs but also their coordinates.
In SECONDO, the detailed data like spatial objects’ precise coordinates are kept in a structure named FLOB (Faked Large OBjects), which is not read until the data are really needed by the query. This mechanism helps to reduce the useless I/O access as much as possible. Like in the above example, ifA’s andB’s MBRs do not intersect, then the comparison between their precise coordinates is completely not needed. This filter and refinement procedure [38] is generically used for processing the join on multi-dimensional objects.
In Parallel SECONDO, it is normal to
A B
Figure 3.2: Two Region Objects process several tasks on the same
com-puter. If all tasks need to read data from the same database or several databases that are settled on the same disk, then the disk interference among these simul-taneous tasks may considerably encum-ber the performance.
Regarding this issue, unlike Hadoop that views every cluster computer as its basic processing unit, Parallel SECONDOsetsDS (Data Server) as its basic pro-cessing unit, shown in Figure 3.1. EachDS contains at least a compact SECONDO system called Mini-SECONDO and its affiliated database. Nowadays it is com-mon that even a low-end computer is also equipped with multi-core processors, large memory and several hard disks. Therefore, in computers with multiple hard disks we can set several DSs on the same computer and the simultaneous tasks on this computer involving different Mini-SECONDOdatabases can read their data independently, hence the disk interference can be reduced to the bare minimum. Since there are severalDSs set on the same computer, we indicate one of them as MS(Main Server), which contains the configuration information and management scripts that work for allDSs on the same computer.
In addition, Hadoop provides two listsmastersandslavesto indicate all com-puters of the current cluster that take part in the system. Parallel SECONDO pro-vides a similar structure calledDS Catalogsince we useDS as the basic processing unit. For every cluster node, its firstDS that is listed inDS Catalogis denoted as the MS. The DS Catalog is duplicated on everyMS, hence any DS can find the otherDSs by scanning it. EachDS in the catalog is represented as one line with the format:
IP:PSFSNode:SecPort
TheIPdenotes the computer where theDS is set. In the runtime, each Mini-SECONDOprovides its service through the Client/Server mechanism with the Mon-itor daemon, as introduced in Section 2.4. The listening port of the Monitoris
denoted bySecPort.
As mentioned before, Parallel SECONDObuilds up a simple distributed file sys-tem PSFS, in order to exchange intermediate data among Mini-SECONDOdatabases directly. Similar like HDFS that needs to materialize impermanent data as disk files, PSFS also caches the data as files on everyDS, in a directory called PSF-SNodeand usually is set inside theDS. Note that there is a dependency between thePSFSNodeand theSecPort, since it is not allowed for different Mini-SECONDO to fetch data from the samePSFSNode, and of course it is impossible for the same Mini-SECONDOto keep data in differentPSFSNodes.
Same as Hadoop that characterizes its cluster computers as masters and slaves, Parallel SECONDOalso indicate oneDS as themDS (master Data Server) and all the others assDSs (slave Data Server). Usually themDS is set on theMS of the master in Hadoop. Besides, it is possible to set themDS also as onesDS.
Along with the distinction ofmDS andsDS, we set the Mini-SECONDOdatabase in themDS as the master database while all the others asslave databases. The mDS contains some global data like the scale of the cluster, global index structures and also the meta data of distributed objects that will be further explained in the next chapter. In the contrast, slave databases contain only the local objects be-longing to the correspondingsDSs. Therefore, themaster databasebecomes the only entrance of the whole system, while all the other components are hidden from end-users, giving them the impression of still handling a standalone SECONDO database.
Consequently, both the text and the graphical interfaces provided for the mas-ter databaseare straightly used as the interface for Parallel SECONDO. When a query is given, containing certain parallel operators that will be introduced in Sec-tion 4.1, it is first transformed into Hadoop jobs by mDS and then further divided into Map and Reduce tasks by the Hadoop framework, while each task contains certain SECONDOqueries. During the Map or Reduce stage, tasks run in parallel on allsDSs, being applied and scheduled by the Hadoop framework, in order to achieve a balanced workload assignment on the cluster. However, their encapsu-lated SECONDOqueries, which can be sequentially processed, are then pushed and executed by Mini-SECONDO. Every task fetches its required data from either the local Mini-SECONDOdatabases or remotely from the other computers via PSFS.
In principle, the master database still can be used as a normal SECONDO database, where all sequential queries can be processed as usual. Considering there exists considerable overhead for processing Hadoop jobs, which is mainly spent on network communication and tasks assignment [14], Parallel SECONDO is more proper to process queries involving massive amounts of data while small queries are more efficiently processed in a standalone SECONDO. In addition, for conventional SECONDOusers who have stored many customized data in the
per-sonal database, it is possible to set the existing database to be themaster database directly during the deployment without migrating the data at all. Therefore, via the master database, end-users can choose an appropriate system, either standalone or parallel, to process various queries according to the sizes of the handling data, making Parallel SECONDOmore flexible for all kinds of queries.
At last, during the installation of Parallel SECONDO, the Hadoop framework is also deployed by unpacking the software and setting HDFS nodes to allMSs. Its configuration parameters are also set automatically, according to the Parallel SECONDOpreferences. Nevertheless, we keep the Hadoop framework independent from the other Parallel SECONDO components. There is no extension done in Hadoop core functionality and the framework works all by its own mechanism without the participation of any Parallel SECONDOcomponents. The reason that we maintain Hadoop and Parallel SECONDOseparated from each other is to keep the system compatible with any possible update from either side. Especially for the SECONDOsystem, when there are any new features extended like a new type constructor or operator, accordingly all Mini-SECONDO systems can be updated with these new features immediately, without affecting the Hadoop framework at all.
3.2
Parallel SECONDO File System
Comparing parallel databases like Vertica, MapReduce and Hadoop provide a sim-ple and flexible mechanism that enables them to achieve an impressive scalability without being carefully tuned. To the best of our knowledge, there exists no publi-cations that such parallel databases can be deployed on clusters with more than one hundred computers [1, 43], while many systems upon Hadoop, like Hive and Pig, can be easily set up on hundreds or even thousands of computers covering several data centers in different geographic regions.
The large-scale scalability is mainly owed to the MapReduce fault-tolerance mechanism. Data in Hadoop are kept in HDFS (Hadoop Distributed File System) ashkey, valueipairs. All pairs with the samekeyare grouped into a split and each split is processed by one Map/Reduce task. The splits are stored in constant-size blocks, 64 MB by default, each block is duplicated on several computers in case some HDFS nodes crash and become inaccessible. The tasks are assigned to slaves by the JobTracker and the TaskTracker of the framework. For various reasons, certain tasks may hang or fail, and then straggle the whole job. In this case, they are replicated to the other idle slaves and fetch their input splits from the duplicated blocks. When the first of the replicated tasks finishes then all its other task attempts are aborted. Thereby, with this brute force approach, although certain computing
and network resources are wasted for assigning and processing duplicated tasks, the performance of the complete job is improved.
In order to inherit the impressive fault-tolerance and scalability of Hadoop, many Hadoop extensions that we introduced in Chapter 2 depend on HDFS to shuf-fle their intermediate data. Even hybrid systems like HadoopDB [1] and Hadoop-GIS [2], which basically use distributed standalone databases to process most database queries, still rely on HDFS to shuffle t