• No results found

Applications on Spatial Data Storage Based on Hadoop Platform

CHAPTER 4 th Hadoop

5.2 Applications on Spatial Data Storage Based on Hadoop Platform

 Hadoop-GIS (Emory University)

The scope of the system is to deliver a scalable, efficient, expressive spatial querying system for efficiently supporting analytical queries on large scale spatial data and to provide an acceptable solution that can for daily operations. Hadoop-GIS integrate a native spatial query engine with MapReduce, where spatial queries are implicitly parallelized through MapReduce through space partitioning, and spatial indexing. This system presented from Emory University at the 39th International Conference on Very Large Databases on August 26th 2013.

By integrating the framework with Hive, Hadoop-GIS provides an expressive spatial query language by extending HIVEQL with spatial constructs, and automate spatial query translation, optimization and execution. Hadoop-GIS supports fundamental spatial queries (selection, join , projection and aggregation), and complex queries such as spatial cross-matching (large scale spatial join) and nearest neighbor queries.

The core components (Figure 5.2.1) of Hadoop-GIS are:

 Spatial Query Translator pares and translates SQL queries into an abstract syntax tree. They extended the HiveQL translator to support a set of spatial query operators, spatial functions and spatial data types.

 Spatial Query Optimizer takes an operator tree as an input and applies rule based optimization such as predicate push down or index-only query processing.

 Spatial Query Engine is a stand-alone spatial query engine which supports following infrastructure operations: 1) spatial relationship comparison , such as intersects, touches, overlaps, contains, within, disjoint, 2)spatial measurements , such as intersection, union, convexHull, distance, centroid, area etc.3)spatial access methods for efficient query processing such as R*-Tree and Voronoi Diagram building and querying these structures. The engine is compiled as a shared library and can be easily shared by all cluster nodes.

Figure 5.2.1: Architecture of Hadoop-GIS system (Source: Emory University)

Major differences between Hive and Hadoop-GIS in the logical plan generation step. If a query does not contain any spatial operations, the resulting logical query plan is exactly the same as the one from generated from Hive. If the query contains spatial operations, the logical plan is regenerated with special handling of spatial operations.

An interesting project that facilitate the process of spatial data from a Hadoop system. This requires a scalable architecture that can query scientific spatial data at massive scale. Another requirement is to support queries on cost effective architecture such as commodity clusters or cloud environments.

 SpatialHadoop (University of Minnesota)

SpatialHadoop is a MapReduce framework with native support for spatial data; available as open-source at http://spatialhadoop.cs.umn.edu/ and presented by Minnesota University at the 39th International Conference on Very Large Databases on August 26th 2013. SpatialHadoop is a comprehensive extension of Hadoop (around 12,000 lines of code inside Hadoop code base.

As a result, SpatialHadoop works in a similar way to Hadoop where programs are written in terms of map and reduce functions and hence existing Hadoop programs can run as is on SpatialHadoop pushes its spatial constructs in all layers of Hadoop, namely, language, storage, MapReduce and operations layer (Figure 5.2.2)

 In the language layer, a simple high level language is provided for the simplification the interaction with the system for non-technical users. This language provides a built-in support for spatial data types, spatial primitive functions, and spatial operations (Point, Rectangle , Distance etc).

 In the storage layer, a two-layered spatial index structure is provided where the global index partitions data across nodes while the local index organizes data in each node. The global index is used for preparing the MapReduce job while the local indexes is used for processing map task.

 In the MapReduce layer , two new components are added to allow MapReduce programs to access indexed files as input, namely, SpatialFileSplitter and SpatialRecordReader. The SpatialFileSplitter exploits the global index by pruning partitions that do not contribute to the query answer, while the SpatialRecordReader exploits the local index to efficiently access records within each partition.

 The Operations layer contains a number of spatial operations (range query, kNN and spatial join), implemented using the indexes and new components in the MapReduce layer. Other spatial operations can be added in a similar way.

Figure 5.2.2: SpatialHadoop architecture(Source: Minnesota University)

 GIS Tools for Hadoop

On March 25 , 2013 David Kaiser and his Big Data crew released the GIS Tools for Hadoop Project on GitHub. The goal of this effort is to include to Hadoop spatial data and spatial analysis. The project contains:

 Esri Geometry API for java: A generic geometry library, can be used to extend Hadoop core with vector geometry types and operations, and enables developers to build MapReduce applications for spatial data

 Spatial Framework for Hadoop: extends Hive and is based on the Esri Geometry API, to enable Hive Query Language users to leverage a set of analytical functions and geometry types. In addition to some utilities for JSON used in ArcGIS(A platform for designing and managing solutions through the application of geographic knowledge).

 Geoprocessing Tools for Hadoop toolkit allows users, who want to leverage the Hadoop Framework, to do spatial analysis on spatial data.

The project supports processing of simple vector data (Points, Lines, Polygons) and basic analysis operations, e.g. relationship analysis on that data running in a Hadoop distributed processing environment.

CHAPTER 6 HADOOPDB

6.1 HadoopDB main components

HadoopDB is an open-source system that connects multiple individual simple database nodes (such as PostgreSQL or ΜySql), which have been developed in a cluster, using Hadoop as a coordination tool that manages the performance of jobs, as well as network communication.

This is the first work that uses the MapReduce Framework together with the RDBMS for big data analytics. HadoopDB stores data in distributed file system (dfs) and , at query processing time , loads the data in the DFS to a local database in each node , and then , processes queries by local DBS. Therefore, HadoopDB resembles a shared-nothing parallel database in which Hadoop provides all the necessary services for escalation to thousands of nodes.

It was created by scientists of the Yale University and was launched for the first time on the 27th of August 2009 by its co-creators Azza Abouzeid and Kamil Bajda-Pawlikowski n Lyon, France. HadoopDB has been created exclusively with open-source components, like Hive which provides its system with a Sql interface.

Business databases are the natural area of implementation for HadoopDB. Due to its superiority in fault tolerance, data escalation is more smooth as compared to the existent parallel databases. Furthermore, the workflow in business database systems is mostly concentrated on the process of reading, which addresses analytical queries to a complex scheme. In order to achieve good performance for a query, the dataset needs to be crucially

prepared, through data partitioning and optimization of queries that refer to table joins.

HadoopDB combines Ηadoop scalability with high database performance with relation to the analysis of structured information.

HadoopDB extends Hadoop's characteristics providing the following four components:

 Database Connector

It is the means for communication between independent database systems residing on nodes in the cluster and JobTrackers. Each MapReduce job supplies the Connector with an SQL query and the connection parameters (such as which JDBC driver to use, query fetch size and other query parameters). The Connector connects to the database, executes the query and returns results as key/value pairs. As compared to the framework Hadoop , databases are data sources similar to data blocks in HDFS.

 Data Loader

It manages the parallel load of data from a file on the databases systems with regard to:

 Items related to the size and the potential repartitioning of a node.

 Repartitioning of data on all nodes based on a given partition key upon loading.

 Partitioning of data on a single node into multiple smaller chunks.

 Bulk-loading node partitions.

Data Loader consists of two components:

 Global Hasher Executes a specific MapReduce job via Hadoop that reads in raw data files stored in HDFS and repartitions them into as many parts as the number of nodes in the cluster

 Local Hasher Copies a partition from HDFS into the local file system of each node and secondarily further partitions the file into smaller sized parts based on the maximum partition size setting

 Catalog

The catalog maintains/stores maintain information about the databases and is crucial for the creation of the query. It includes the following:

 Connection parameters, such as database location, driver class and credentials.

 Metadata, such data partitioning properties and replica locations.

Data sets contained in the cluster.

 SQL MapReduce planner which extends Hive providing an SQL interface on HadoopDB.

 Local hasher Copies data partitions from HDFS to the local file system. It also partitions files into smaller chunks.

Figure 6.1.1: HadoopDB architecture

One of the biggest drawbacks of HadoopDB that makes it unsuitable in the realm of large scale data processing is the lack of fault tolerance as the data layer. Partitioning of the raw data, uploading them onto individual database nodes is no more supervised by Hadoop framework.

While HadoopDB integrates the power of efficient DBMS technology with MapReduce, yet it seems impractical to employ this system to carry out large scale data processing. It shall be a great advancement towards large scale data processing if HadoopDB is improved to possess fault tolerance at data layer too, just like Hadoop do. More, HadoopDB has the following drawbacks because the DBMS that process queries are separated from the DFS that store the data. First, there is storage overhead due to redundant storage of data in both the DFS and local databases. Second HadoopDB causes performances degradation by re-loading the DFS data to local database when processing queries that cannot be processed using the current snapshot of the local databases. Third, HadoopDB, being a shared nothing architecture, does not support queries that require internode communication and does n’t integrates the relation DBMS with the DFS as a single system for big data analytics.

6.2 HadoopDB Example

HadoopDB comes with a data partitioner that can partition data into a specified number of partitions. The idea is that a separate partition can be bulk-loaded into a separate database and indexed appropriately. The code to load the data set into our a Single Postgres Note is:

Hadoop fs –get /data/part-001 my_file Postgres>create database grep0;

Postgres> use grep0;

Postgres> create table grep (key1 char(10), field char(10));

Load data local infile ‘my_file’ into table grep fields terminated by ‘|’ (key1, field);

Above we have loaded data into both HDFS and Postgres. Then in order to generate the XML (http://hadoopdb.sourceforge.net/doc/) to search for a pattern in the data that we loaded earlier:

Java –cp $CLASSPATH:hadoopdb.jar edu.yale.cs.hadoopdb.benchmark.GrepTaskDB \ -pattern %wo% -output Padraig –hadoop.config.file HadoopDB.xml

10/09/13 18:01:41 INFO exec.DBJobBase: grep_db_job

10/09/13 18:01:41 INFO exec.DBJobBase: SELECT key1, field FROM grep WHERE field LIKE '%%wo%%';

10/09/13 18:01:41 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker,

sessionId=

10/09/13 18:01:41 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments.

Applications should implement Tool for the same.

10/09/13 18:01:41 INFO mapred.JobClient: Running job: job_local_0001

10/09/13 18:01:41 INFO connector.AbstractDBRecordReader: Data locality failed for hadoop1.localdomain

10/09/13 18:01:41 INFO connector.AbstractDBRecordReader: Task from hadoop1.localdomain is connecting

to chunk 0 on host localhost with db url jdbc:mysql://localhost:3306/grep0

10/09/13 18:01:41 INFO connector.AbstractDBRecordReader: SELECT key1, field FROM grep WHERE field

LIKE '%%wo%%';

10/09/13 18:01:41 INFO mapred.MapTask: numReduceTasks: 0

10/09/13 18:01:41 INFO connector.AbstractDBRecordReader: DB times (ms): connection = 245, query

execution = 2, row retrieval = 36

10/09/13 18:01:41 INFO connector.AbstractDBRecordReader: Rows retrieved = 3

10/09/13 18:01:41 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done.

10/09/13 18:01:41 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_m_000000_0' to file:/home/hadoop/padraig

10/09/13 18:01:41 INFO mapred.LocalJobRunner:

10/09/13 18:01:41 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.

10/09/13 18:01:42 INFO mapred.JobClient: map 100% reduce 0%

10/09/13 18:01:42 INFO mapred.JobClient: Job complete: job_local_0001 10/09/13 18:01:42 INFO mapred.JobClient: Counters: 6

10/09/13 18:01:42 INFO mapred.JobClient: FileSystemCounters

10/09/13 18:01:42 INFO mapred.JobClient: FILE_BYTES_READ=115486 10/09/13 18:01:42 INFO mapred.JobClient: FILE_BYTES_WRITTEN=130574 10/09/13 18:01:42 INFO mapred.JobClient: Map-Reduce Framework

10/09/13 18:01:42 INFO mapred.JobClient: Map input records=3 10/09/13 18:01:42 INFO mapred.JobClient: Spilled Records=0 10/09/13 18:01:42 INFO mapred.JobClient: Map input bytes=3 10/09/13 18:01:42 INFO mapred.JobClient: Map output records=3 10/09/13 18:01:42 INFO exec.DBJobBase:

grep_db_job JOB TIME : 1747 ms.

The result is stored in HDFS and in an output directory that just specified.

$cd backup

Input file fragments distributed by the initial data placement algorithm might be disrupted due to the following reasons: (1) new data is appended to an existing input file; (2) data blocks are deleted from the existing input file; and (3) new data computing nodes are added into an existing cluster. To address this dynamic data load-balancing problem, we implemented a data redistribution algorithm to reorganize file fragments based on computing ratios. The data redistribution procedure is described as the following steps. First, like initial data placement, information regarding the network topology and disk space utilization of a cluster is collected by the data distribution server. Second, the server creates two node lists: a list of nodes in which the number of local fragments in each node exceeds its computing capacity and a list of nodes that can handle more local fragments because of their high performance. The first list is called over-utilized node list; the second list is termed as under-utilized node list. Third, the data distribution server repeatedly moves file fragments from an over-utilized node to an

underutilized node until the data load are evenly distributed. In a process of migrating data between a pair of an over-utilized and underutilized nodes, the server moves file fragments from a source node in the over-utilized node list to a destination node in the underutilized node list.

Note that the server decides the number of bytes rather than fragments and moves fragments from the source to the destination node. The above data migration process is repeated until the number of local fragments in each node matches its speed measured by computing ratio.

7.2 Optimizing Data Partitioning for Data-Parallel Computing

Recent developments in distributed data processing systems (MapReduce, Hadoop and Dryad) have significantly simplified the development of large-scale data optimized techniques and data partitioning applications. In these systems, data partitioning is the keystone for successful scalability in a large number of clusters. Still, data partitioning techniques used by systems are very primitive and cause serious performance failure.

 The use of hash functions or a set of a series of equidistant keys for data hash-partitioning often gives asymmetric divisions regarding data or low performance in the computation of results or even failures.

 Balanced workload is not the only factor that needs to be considered in achieving optimal performance.The amount of partitioning is another important factor.There is often a sore point between the amount of computations per partition and the movement of the network which relies on the number of nodes making the detection of an ideal spot difficult.

 On multiple computation levels, data computations can be completed at later stages.It is often hard to predict such skews before running the program.

Even on the same program, imported data often change and present different characteristics (for example, production of statistics based on log files) and they demand partitioning schemes which are adjusted to the change of data towards optimal performance. Obviously, performance in a system of parallel data depends on many significant parameters, like configurations and adjustments of infrastructures, as well as job scheduling.

Therefore, data segmentation affects several aspects with relation to the way in which a job will run on the cluster, including the simultaneous workload for every top, as well as the traffic of the network among tops.

Hence, data partitioning is a crucial factor that can be exploited by users in order to achieve good performance of the system. Many MapReduce jobs fail due to issues within data partitioning. Any improvement in the types of data partitioning contributed significantly to the enhancement of the usability of these systems.

7.3 Referential Partitioning

In general, database system developers spend a lot of time optimizing the performance of joins which are very commonly used and are costly in operation.

Partitioned joins are especially costly for the system performance in Hadoop, because they require an additional MapReduce job for the additional data segmentation on a join key.

Typically joins are computed within a database system which involves far fewer readings and writings on the disk than are computed across a MapReduce job on Hadoop. Hence, for reasons related to the performance of the system, HadoopDB chooses to compute join creation processes only inside a database system, while the outcome will be deployed on each node.

In order to be performed exclusively inside the database in HadoopDB, the join must be local, for example each node must join data from tables stored locally, without any need for data to be transferred across the network. When data needs to be sent across a cluster, Hadoop undertakes query processing and the join is not accomplished inside the database system. If two tables are partitioned based on attributes of a join (for example, both employee and department tables are connected through the department key/id), then a local join is possible since each single database node can create a join on the partitioned data without affecting the rest of the data stored on other nodes.

Generally, traditional parallel database systems prefer local joins over repartitioned joins since this approach is less expensive for the system performance. This discrepancy between local and repartitioned joins is even greater in the case of HadoopDB due to the difference in performance regarding the implementation of joins between Hadoop and the database. For this reason, HadoopDB is willing to sacrifice certain performance benefits, such as quick load time, in exchange for local joins.

In order to create the desired joins in a single node database system inside HadoopDB, the method chosen is "aggressive hash-partitioning". Typically, database tables include partitioned data based on an attribute from the original table. However, this method limits the level of co-partitioning, since tables can be connected with each other through multiple primary/foreign-key references. For example, in TPC-H database, the table for line item data contains a foreign-key projected on the order table through the order key, while the order table contains a foreign-key projected on the customer table through the customer key. If the line item table could be partitioned based on the customer who made the order, then any of the join combinations of the customer, order and line item tables could be local to any node. What is more, since the line item table does not contain the customer key, direct partitioning is impossible. HadoopDB has, therefore, been extended to support referential data partitioning. Although a similar technique was recently launched by Oracle 11g, it serves a different purpose with relation to the partitioning data scheme of the HadoopDB platform which facilitates the creation and access of joins to a shared-nothing network.

For tables that are not co-partitioned, joins are generally performed using the MapReduce platform. This usually put into effect in the Reduce phase of the job. The Map phase reads

For tables that are not co-partitioned, joins are generally performed using the MapReduce platform. This usually put into effect in the Reduce phase of the job. The Map phase reads