MASARYK UNIVERSITY FACULTY OFINFORMATICS
}w !"#$%&'()+,-./012345<yA|
Hadoop as an Extension of the
Enter-prise Data Warehouse
MASTER THESIS
Bc. Aleš Hejmalíˇcek
Hereby I declare, that this paper is my original authorial work, which I have worked out by my own. All sources, references and literature used or excerpted during elaboration of this work are properly cited and listed in complete refer-ence to the due source.
Bc. Aleš Hejmalíˇcek
First of all I would like to thank my supervisor Vlastislav Dohnal for his time, for his advices and feedback along the way.
Then I would like to thank to my family for their continuous support through whole university studies. I would also like to apologize to my girlfriend for wak-ing her late at night with loud typwak-ing.
Last but not least is a thank to my colleagues in AVG BI team for respecting my time inflexibility, especially while finishing this thesis.
The goal of this thesis is to describe issues with processing big data and to pro-pose and explain an enterprise data warehouse architecture that is capable of processing large volumes of structured and unstructured data. The thesis aims to explain integration of Hadoop framework as a part of the proposed architecture into existing enterprise data warehouses.
Hadoop, Data warehouse, Kimball, Analytic platform, OLAP, Hive, ETL, Analyt-ics
Introduction . . . 1
1 Analytic platforms introduction . . . 2
1.1 New data sources . . . 3
1.2 Data warehouse . . . 3
1.2.1 Analytic platform . . . 4
1.2.2 Extract, transform and load . . . 5
1.2.3 Kimball’s architecture . . . 5 1.2.3.1 Dimensional modelling . . . 5 1.2.3.2 Conformed dimensions . . . 6 1.2.3.3 Surrogate keys . . . 7 1.2.3.4 Fact tables . . . 7 1.2.4 Business intelligence . . . 8
1.2.4.1 Online analytical processing . . . 9
1.2.4.2 Analytics . . . 10 1.2.4.3 Reporting . . . 10 1.3 Existing technologies in DW . . . 10 2 Proposed architecture . . . 14 2.1 Architecture overview . . . 15 2.2 Multi-Platform DW environment . . . 16 2.3 Hadoop . . . 17 2.3.1 HDFS . . . 17 2.3.2 YARN . . . 18 2.3.3 MapReduce . . . 18 2.3.4 Hive . . . 18 2.3.4.1 HiveQL . . . 20 2.3.4.2 Hive ETL . . . 21 2.3.5 Impala . . . 24 2.3.6 Pig . . . 24 2.3.7 Kylin . . . 24 2.3.8 Sqoop . . . 25 3 Hadoop integration. . . 27
3.1 Star schema implementation . . . 27
3.1.1 Dimensions implementation . . . 27
3.1.2 Facts implementation . . . 29
3.1.3 Star schema performance optimization . . . 30
3.2 Security . . . 30 3.3 Data management. . . 31 3.3.1 Master data . . . 31 3.3.2 Metadata . . . 31 3.3.3 Orchestration . . . 32 3.3.4 Data profiling . . . 33 3.3.5 Data quality . . . 34
3.4 Extract, transform and load . . . 35
3.4.1 Hand coding . . . 36
3.4.2 Commercial tools . . . 36
3.5 Business intelligence . . . 37
3.5.1 Online analytical processing . . . 37
3.5.2 Reporting . . . 38
3.5.3 Analytics . . . 39
3.6 Real time processing . . . 39
3.7 Physical implementation . . . 41
3.7.1 Hadoop SaaS . . . 42
3.8 Hadoop commercial distributions. . . 43
4 Other Hadoop use cases . . . 44
4.1 Data lakes . . . 44
4.2 ETL offload platform . . . 44
4.3 Data archive platform . . . 45
4.4 Analytic sandboxes . . . 46
Companies want to leverage emerging new data sources to gain market advan-tage. However, traditional technologies are not sufficient for processing large vol-umes of data or streaming real time data. Hence, in last few years a lot of com-panies invested into development of new technologies that are capable of pro-cessing such data. Those data propro-cessing technologies are generally expensive. Therefore, Hadoop, open source framework for distributed computing was de-veloped.
Lately, companies have adopted and integrated Hadoop framework to im-prove their data processing capabilities, despite the fact that they already use some form of data warehouse. However adopting Hadoop and other similar tech-nologies brings new challenges for all people involved in data processing, report-ing and data analytics. Main issue is integration into already runnreport-ing data ware-house environment as Hadoop technology is relatively new and not many busi-ness use cases and successful implementations have been published and therefore there are no existing best practices or guidelines. This is the reason why I choose this topic as the topic of my master’s thesis.
Goal of my thesis is to suggest data warehouse architecture that allows pro-cessing of large amount of data in batch manner and streaming data as well and to explain techniques and processes for new system integration into existing en-terprise data warehouse. Including explanation of data integration using Kim-ball’s architecture best practices, data management processes and connection to business intelligence systems such as reporting or online analytical processing. Proposed architecture also needs to be accessible the same way as existing data warehouses, so the data consumers can access the data in familiar manner.
The first chapter introduce the problems of data processing and explains ba-sic terms such as data warehousing, business intelligence or analytics. The main goal is to present new issues, challenges and currently used technologies for data processing.
The second chapter presents requirements on new architecture and purposes architecture that meets necessary requirements. Further in second chapter are de-scribed individual technologies and theirs characteristics, advantages and disad-vantages.
The third chapter focuses on Hadoop integration into existing enterprise data warehouse environment. It includes explanation of data integration in Hadoop following Kimball’s best practices from general data warehousing such as start schema implementation and individual parts of data management plan and pro-cesses. The other part describes implementation of extract, transform and load process, usage of business intelligence and reporting tools and then focuses on physical Hadoop implementation and Hadoop cluster location.
The Last chapter explains other specific business use cases for Hadoop in data processing and data warehousing as it is well-rounded technology and can be used for different purposes.
In the past few years, the amount of data available to organizations of all kinds has increased exponentially. Businesses are under the pressure to be able to re-trieve information that could be leveraged to improve business performance and to gain competitive advantage. However, processing and data integration is get-ting more complex due to data variety, volume and velocity. This is the challenge for most of organizations, as internal changes are necessary in organization, data management and infrastructure /citebd.
Due to adoption of new technologies, companies hire people with specific experience and skills in these new technologies. As the most of the technologies and tools are relatively young, it is expected that skill shortage gap is going to grow. Ideal candidate should have mix of analytic skills, statistics and coding experience and such people are difficult to find. By 2018 The United States alone is going to face a shortage of 140,000 to 190,000 people with analytical expertise and 1.5 million managers and analysts with the skills to understand and make decisions based on an analysis of big data [1].
However, the data are not new for organizations. In most cases the data from mobile devices, log data or sensor data have been available for a long time. A lot of those data were not previously processed and analyzed even though the sources of massive amount of data existed in history. Major change in business perspective happened in last several years with development of distributed an-alytic platforms. Nowadays, the technologies are available for anyone for an af-fordable price and a lot of companies have started to look for business cases to build and use analytical platforms.
Due to the demand, a lot of companies have created their products to help businesses solve those issues. Companies such as Cloudera, IBM, Microsoft or Teradata offers solution including software, hardware and enterprise support all together. However, an initial price is too high for smaller companies, as the price for a product itself does not takes into consideration companies’ existing systems and additional data sources and processing or data integration. Data integration itself is a major expense in data processing projects.
Most technological companies already somehow integrate data and build data warehouses to ensure simple and unified access to data [2]. However, these data warehouses are built to secure precise reporting and usually uses technologies such as relational database management systems (RDBM) that are not designed for processing of petabytes of data. Companies build data warehouses to achieve fast, simple and unified reporting. This is mostly performed by aggregating the data. This approach improve processing speeds and reduce needed storage space. On the other hand, aggregated data doesn’t allow complex analytics due to a lack of detail. For analytic purposes data should have the same detail as raw source data.
When building analytic platform or building complementary solution to an already existing data warehouse, business must decide if they prefer commercial
third party product with enterprise support or if they are able to build the solu-tion themselves in-house. Both approaches have advantages and disadvantages. Main advantage of an in-house solution is the price and modifiability. On the other hand it can be difficult to find experts with enough experience to develop and deliver end to end solution and to provide continuous maintenance. Major disadvantages of buying complete analytical platform is the price as it can hide additional costs in data integration, maintenance and support.
1.1
New data sources
As new devices are connecting to the network, various data become available, a large amount of data is hard to manage with common technologies and tools and to process it within tolerable time. Those new data sources include mobile phones, tablets, sensors, wearable devices, which number has grown significantly in last years. All these devices interact with its surrounding or web sites such as social media and every action can be recorded and logged. New data sources have following characteristics:
• Volume - As mentioned before, the amount of data is rapidly increasing every year. According to The Economist [3], the amount of digital infor-mation increases tenfold every five years.
• Velocity - Interaction with social medias or with mobile applications usu-ally happens in real time and hence causing continuous data flow. Pro-cessing real time data can help business make valuable decisions.
• Variety - Data structure can be dynamic and it can change significantly with any record. Such data include XML or nested JSON and JSON arrays. However, unstructured or semi structured data are harder to process with traditional technologies, although these data can contain valuable infor-mation.
• Veracity - With different data sources, it is getting more difficult to main-tain data cermain-tainty and this issue is more challenging with more volume and higher velocity.
With increasing volume and complexity of data, integrating and cleaning cesses get more difficult and companies are integrating new platforms and pro-cesses to deal with data processing and analytics [4, 5].
1.2
Data warehouse
Data warehouse is a system that allows process data in order to be easily analysable and queryable. Bill Inmon defined a data warehouse in 1990 as follows:
“A data warehouse is a subject-oriented, integrated, time-variant and non-volatile collection of data in support of management’s decision making process.” [6]
He defined the terms as follows:
• Subject Oriented - Data that gives information about a particular subject instead of about a company’s ongoing operations.
• Integrated - Data that is gathered into the data warehouse from a variety of sources and merged into a coherent whole.
• Time-variant - All data in the data warehouse is identified with a particu-lar time period.
• Non-volatile - Data are stable in a data warehouse. More data can be added but historic data are never removed or modified. This enables man-agement to gain a consistent picture of the business.
Enterprise data warehouse integrates and unifies all the business information of an organization and makes it accessible all across a company without compro-mising security or data integrity. It allows complex reporting and analytics across different systems and business processes.
1.2.1 Analytic platform
Analytic platform is a set of tools and technologies that allow storing, process-ing and analysis of data. Most companies want to focus on processprocess-ing of higher amount of data, therefore a distributed system is a base of analytical platform. Often newer technologies are used such as NoSQL and NewSQL databases and advanced statistical tools and machine learning. Many companies providing an-alytic platforms focus on providing as much integrated tools as possible in order to make adaptation of a new platform seemingly easy.
Those platforms are designed for processing a large amount of data in order to make advanced analytics possible. Among others, advance analytics include following cases:
• Search ranking • Ad tracking
• User experience analysis • Big science data analysis
• Location and proximity tracking • Causal factor discovery
• Social CRM
• Document similarity testing
1.2.2 Extract, transform and load
Extract, transform and load (ETL) [7] is a process of moving data across systems and databases in order to make them easily analysable. ETL is mostly used in data warehousing as data that are being loaded into data warehouse are often trans-formed and cleansed to ensure data quality needed for analysis. ETL describes three steps of moving data:
• Extract - Process of an extraction of data from a source system. Extraction can be directly from database or through some API. Extraction can im-plement complex mechanisms of data extraction to extract only changes from a database. This process is called change data capture and is one of the most efficient mechanism for data extraction. Extraction also often includes extract archiving for audit purposes.
• Transform - Transformation can implement any algorithm or transforma-tion logic. Data are transformed so they satisfy data model and data qual-ity needs of data warehouse. This can include data type conversion, data cleansing or even fuzzy lookups. Usually data from more data sources are integrated together within this step.
• Load - Process of loading transformed data into data warehouse. Usually includes loading transformed dimensional and fact tables.
1.2.3 Kimball’s architecture
Ralph Kimball designed individual processes and tasks within a data warehouse in order to simplify its development and maintenance [8]. The Kimball’s archi-tecture includes processes used in end to end delivery of a data warehouse. The whole data warehouse development process starts with gathering users’ require-ments. Primary goal is to gather metrics which needs to be analyzed in order to identify data sources and design data model. Then it describes incremental de-velopment method that focus on continuous delivery as data warehouse project are rather bigger and it is important to start delivering business value early in the data warehouse development. Key features of Kimball’s architecture are di-mensional modelling and identifying fact tables, which are described further in this thesis. Regarding ETL process, Kimball described steps of an extraction, data transformation, data cleaning and loading into stage as into temporal storage and into final data warehouse dimensional and fact tables. All other processes around data warehouse such as data warehouse security, data management and data governance are included as well. All together Kimball’s architecture is a quite simple and understandable framework for data warehouse development.
1.2.3.1 Dimensional modelling
Dimensional modeling [8] is a technique for designing data models simple and understandable. As most of the business users tend to describe world in the
enti-ties such as product, customer or dates it is reasonable to model the data the same way.
It is intuitive implementation of data cube that has edges labeled product, customer and date for example. This implementation allows users to easily slice the data and breakdown them by different dimensions. Inside of a data cube are measured metrics. When cube is sliced, metrics are shown, depending on how many dimensions are sliced.
Implementation of dimensional model is a star schema [8]. In star schema, all dimensions are tied to fact tables [8]. Therefore in star schema it is easily visible, which dimensions can be used in for slicing.
Figure 1.1: Star schema example.
Dimensions can also contain different hierarchies and additional attributes. As the dimension needs to track history, Kimball defines several types [8] of di-mensions. The most commons are:
• Type one - It does not track historical changes. When a record is changed in source system, it is updated in dimensions. Therefore only one record is stored for each natural key.
• Type two - It uses additional attributes such as date effective from and date effective from to track different versions of record. When record is changed in source system, new record is inserted into dimensions and date effective to attribute of old record is updated.
• Type three - It is used to track changes in defined attributes. If history tracking is needed, two separate attributes are created. One defines cur-rent state and the second one previous value.
1.2.3.2 Conformed dimensions
One of the key features in data integration are conformed dimensions [8]. These are dimensions that describes one entity the same way across all integrated data
sources. Main reason for implementation of conformed dimensions is that CRM, ERP or billing systems can have different attributes and different ways how to describe business entities such as customer. Dimension conforming is process of taking all information about an entity and designing the transformation process in the way that data about the entity from all data sources are merged into one dimension. Dimension created using this process is called conformed dimension. Using conformed dimension significantly simplifies business data as all people involved in the business use the same view on customer and the same definition. This allows simple data reconsolidation.
Figure 1.2: Dimension table example.
1.2.3.3 Surrogate keys
Surrogate keys [8] ensures identification of individual entities in a dimension. Usually surrogate keys are implemented as an incremental sequence of integers. Surrogate keys are used because duplication of natural keys is expected in di-mension tables as the changes in time need to be tracked, therefore surrogate key identifies specific version of the record. In addition, more than one natural key would have to be used in conformed dimension as the data may come from more data sources. Surrogate keys are predictable and easily manageable as they are assigned within a data warehouse.
1.2.3.4 Fact tables
Fact tables are tables containing specific events or measures of a business pro-cess. A fact table has typically two types of columns. Foreign keys of dimension
tables and numeric measures. In an ETL process lookup on dimension tables is performed and values or keys describing entity in dimension are replaced by sur-rogate keys from particular dimensions. Fact table is defined by its granularity. Fact tables should always contain only one level of granularity. Having different granularity in a fact table could cause issues in measures aggregations. An exam-ple of fact tables with specific granularity can be table of sales orders and another one with order items.
Figure 1.3: Fact table example.
While designing fact table it is important to identify business processes that users want to analyze in order to specify data sources needed. Then follows a definition of measures such as sale amount or tax amount and a definition of dimensions that make sense within a business process context.
1.2.4 Business intelligence
Business intelligence (BI) is set of tools and techniques, which goal is to simplify querying and analysing data sets. Commonly, BI tools are used as a top layer of data warehouse which is accessible for wide spectrum of users as a lot of BI techniques do not require advanced technological skills [9].
Business intelligence uses variation of techniques. Depending on a business requirements different tool or technique is chosen. Therefore BI in companies usu-ally contains various tools from different providers. Example of BI techniques:
• Online analytical processing (OLAP) • Reporting
• Dashboards
• Predictive analytics • Data mining
1.2.4.1 Online analytical processing
Online analytical processing (OLAP) [10, 9] is an approach that achieves faster response when querying multidimensional data, therefore it is one of the key fea-tures of companies’ decision system. The main advantage of OLAP is a leverage of star or snowflake schema structure.
Three types of OLAP exist. If OLAP tool stores data in special structure such as hash table (SSAS) or multidimensional array on OLAP server then it is called multidimensional OLAP (MOLAP). MOLAP provides quick response to opera-tions such as slice, dice, roll-up or drill-down as a tool is able to simply navigate trough precalculated aggregation to the lowest level.
Among MOLAP, two other types of OLAP tools exist. Those are relational OLAP (ROLAP), which is based on querying data in relational structure (e.g. in RDBM). ROLAP is not very common as it does not achieve querying response time of MOLAP. The third type is hybrid OLAP (HOLAP), which is combination of MOLAP and ROLAP. One of the popular implementation is precalculating ag-gregations into MOLAP and keeping underlying data stored in ROLAP, therefore only when an aggregation is requested, underlying data are not queried.
OLAP is often queried via multidimensional query language such as MDX either directly or through analytic tool such as Excel1or Tableau2. User then only
works with a pivot table and is able to query or filter aggregated and underlying data depending on OLAP definition.
Some of the OLAP tools include: • SSAS Microsoft3.
• Mondrian developed by Pentaho4.
• SAS OLAP Server5.
• Oracle OLAP6. 1. https://products.office.com/en-us/excel 2. http://www.tableausoftware.com/ 3. http://www.microsoft.com/en-us/server-cloud/solutions/business-intelligence/analysis.aspx 4. http://community.pentaho.com/projects/mondrian 5. http://www.sas.com/resources/factsheet/sas-olap-server-factsheet.pdf 6. http://www.oracle.com/technetwork/database/options/olap/index.html
1.2.4.2 Analytics
Analytics is a process of discovering patterns in data and providing insight. As a multidimensional discipline, analytics includes methodologies from mathemat-ics, statistics and predictive modeling to retrieve valuable knowledge. Consider-ing data requirements, often analyses such as initial estimate calculations or data profiling does not require transformed data. Those analyses can be performed on slightly cleaned or even on raw data. Bigger impact on an analysis result usually have chosen subset of data. Therefore data quality processes are usually less strict than for data that are integrated into a EDW for reporting purposes. However, in general it is better to perform analyses on cleansed and transformed data.
1.2.4.3 Reporting
Reporting is one of the last parts of a process that starts with discovering useful data sets within the company and continues through their integration with ETL process into EDW. This process, together with reports, have to be well designed, tested and audited as reports are often used for reconsolidation of manual busi-ness processes. Reports can also be used as official da stock market. Therefore, the process of cleansing and transformation have to be well documented and precise. Due to significant requirements on data quality with increasing volume of data, an ETL processing gets significantly more complex. Having a large amount of data needed for reporting can cause significant delivery issues.
1.3
Existing technologies in DW
There are many architectural approaches to how to build a data warehouse. For last 20 years a lot of experts have been improving methodologies and processes that can be followed. Those methodologies are well known and applicable with traditional business intelligence and data warehouse technologies. As the method-ologies are common, companies developing RDBMs such as Oracle or Microsoft have integrated functionality to make data warehouse development easier. There is also a lot of ETL frameworks and data warehouse IDEs (such as WhereScape) that provide higher level of data warehouse development abstraction [7, 9].
Thanks to more than 20 years of continuous development and support from community, the technology have been adjusted, so developers can focus on busi-ness needs rather than on technology.
A lot of companies are developing or maintaining data warehouses built to integrate data from various internal systems such as ERP, CRM or back end sys-tems. Those data sources are usually built on RDBMs and therefore data are struc-tured and well defined. However, the ETL process is still necessary. Those data are not usually bigger than a few gigabytes a day, hence a processing of data transformations is feasible. After data are transformed and loaded into data ware-house, data are usually accessed via BI tools for easier data manipulation by data
consumers. Data from different sources are integrated either in data warehouse, data marts or BI depending on specific architecture.
TDWI performed relevant research about architectural components used in data warehousing with the following results.
Figure 1.4: Currently used components and plan for the next three years [2]. From results following statements are deductible.
• EDW and data marts are commonly used and will be used even in the future.
• OLAP and tabular data are one of the key components of a BI. • Dimensional star schema is preferable method in data modelling.
• RDBMs are used commonly, but it is expected that they will be used less in the next few years.
• In-memory analytics, columnar databases and Hadoop usage is expected to grow. However, not all companies are planning to adopt these tech-nologies as the techtech-nologies are used mostly for specific purposes.
Therefore it is expected that usage of new technologies will grow significantly. Nonetheless, existing basic principles of data warehousing will prevail. Hence, it is important that new technologies can be completely integrated into existing data warehouse architecture. This includes both logical and physical architecture of data warehouse.
Regarding data modelling, currently, most data warehouses use some form of hybrid architecture [2] that has origin either in Kimball’s architectural approach, which is often called bottom-up approach and is based on dimensional model-ing or in Inmon’s top-down approach which prefers buildmodel-ing data warehouse in third normal form [6]. Modeling technique highly depends on development method used. Inmnon’s third normal form or Data Vaults are preferable for agile development. Data Vault is combination of both approaches. On the other hand, Kimball is more suitable for iterative data warehouse development.
While integrating new data sources, a conventional data warehouse faces sev-eral issues:
• Scalability - Commonly platforms for building data warehouses are RDBMs such as SQL Server from Microsoft or Oracle database. Databases alone are not distributed, therefore adding new data sources can cause adding another server with new database instance. In addition, new processes have to be designed to support such a distributed environment.
• Price - RDBMs can run almost on any hardware. However, for processing gigabytes of data and at the same time accessing data for reporting and analytics, it is necessary to either buy powerful server and software li-censes or significantly optimize DW processes. Usually all three steps are necessary. Unfortunately, licenses, servers and people are all expensive resources.
• Storage space - One of the most expensive parts of the DW is storage. For the best performance RBMS needs fast accessible storage. Consider-ing data warehouse environment, where data are stored more than once in RDBM (e.g. in persistent stage or operational data store), for fault toler-ance disks are set up in RAID, storage plan needs to be designed precisely to keep costs low as possible. Data also need to be backed up regularly and archived.
In addition, historically not many sources were able to provide data in a real time or close to a real time, therefore traditional batch processing approach was very convenient. However, business needs output with the lowest latency pos-sible, especially information for operational intelligence as a business needs to respond and act. Most suitable process for batch processing is ETL. Nonethe-less, with real time streaming data it is more convenient to use extract, load and
transform (ELT) processes in order to be able to analyze data before long running transformations start. However, both an ETL and an ELT are implementable for real time processing with a right set of tools.
In order to effectively tackle issues and challenges of current data warehouses and processing of new data sources, DW architecture has to be adjusted. Main requirements on purposed architecture are:
• Ability to process large amount of data in batch processing as well as in a real time.
• Scalability up to petabytes of data stored in a system. • Distributed computing and linear performance scalability. • Ability to process structured and unstructured data. • Linear storage scalability.
• Support for ETL tools and BI tools. • Relatively low price.
• Similar data accessibility to RDBMs. • Support for variety of analytical tools. • Star schema support.
2.1
Architecture overview
Proposed architecture uses Hadoop [11] as an additional system to RDBM that brings new features that are useful for specific tasks that are expensive on RDBM. Hadoop is logically implemented as RDBM however it should be used mainly for processing of big amount of data and streaming real time data.
Figure 2.1: Diagram describing proposed high-level architecture.
Adopting Hadoop framework into existing data warehouse environment brings several benefits:
• Scalability - Hadoop supports linear scalability. By adding new nodes into Hadoop cluster we can linearly scale performance and storage space [12]. • Price - As an open source framework, Hadoop is free to use. This how-ever does not mean that it is inexpensive, as maintaining and developing applications on Hadoop cluster, hardware and experienced people with knowledge of this specific technology are expensive. However, generally Hadoop is significantly cheaper for a terabyte of storage than RDBM as it run on commodity hardware.
• Modifiability - Another advantage of open source project is modifiabil-ity. In case some significant functionality is not available, it is possible to develop it in-house.
• Community - A lot of companies as Yahoo, Facebook or Google are sig-nificantly contributing into Hadoop source codes. Either with developing and improving new tools or publishing theirs libraries.
Usage of individual Hadoop tools is described in following diagram.
Figure 2.2: Diagram of Hadoop tools usage in EDW architecture.
Hadoop RDBM
Open source Proprietary
Structured and unstructured data Structured only, mostly
Less expensive Expensive
Better for massive full data scans Usage of index lookups
Support for unstructured data Indirect support for unstructured data No support for transaction processing Support for transaction processing
Table 2.1: Hadoop and RDBS comparsion.
2.2
Multi-Platform DW environment
Existence of core data warehouse is still crucial for reporting and dashboards. It is also mostly used as a data source for online analytic processing [2]. In order to solve issues mentioned before, it is convenient to integrate new data platform that
will support massive volumes and variety of data. What makes this task difficult is precise integration of a new platform into an existing DW architecture.
As a data warehousing focuses on data consumers, they should be able to ac-cess a new platforms the same way and they should feel confident using the new platform. Using the Kimball’s approach for integrating data on both platforms gives users the same view on data and also unites other internal data warehouse processes.
2.3
Hadoop
Hadoop is an open-source software framework for distributed processing which allows to cheaply store and process vast amounts of structured and unstructured data. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.
High-availability is covered at the application layer, therefore Hadoop does not rely on hardware to secure data and processing. Hadoop delivers service on top of a cluster of computers, each of which may be prone to failures.
The framework itself consists of tens of different tools for various purposes and the number of tools available is growing fast. Three major components of Hadoop 2.x are Hadoop distributed file system (HDFS), Yet another resource ne-gotiator (YARN) and MapReduce programming model.
Most relevant tools to data warehousing and analytics include:
• Hive
• Pig
• Sqoop
• Impala
Some other tools are not part of Hadoop, but are well integrated with Hadoop framework (such as Storm).
2.3.1 HDFS
HDFS was designed to be a scalable, fault-tolerant, distributed storage system that works closely with MapReduce and runs on commodity hardware.
"HDFS has many similarities with existing distributed file systems. How-ever, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets."[13]
An HDFS cluster consists of NameNode which manages metadata and DataN-odes that stores the data. Typically each file is split into large blocks of 64 or 128
megabytes and then distributed to DataNodes. HDFS secure high-availability by replicating and distributing to other nodes. When a block is lost due to failure, NameNode creates another replica of the block and distributes it automatically to different DataNode.
2.3.2 YARN
YARN is a cluster management technology and it combines a central resource manager that reconciles the way applications use Hadoop with node manager agents that monitor processing on individual DataNodes. Main purpose of YARN is to allow parallel access and usage of a Hadoop system and resources as until Hadoop 2.x processing of parallel queries was not possible due to lack of resource management. YARN opens Hadoop for wider usage.
2.3.3 MapReduce
MapReduce is a software framework that allows developers to write programs that process massive amounts of structured or unstructured data in parallel across a distributed cluster. The MapReduce is divided into two major parts:
• Map - The Map function takes a series of key/value pairs, processes each, and generates zero or more output key/value pairs. The input and output types of the map can be different from each other.
• Reduce - The input for each Reduce is pulled from the machine where the Map ran and sorted using the application’s sorting function. Number of Reducers does not depend on number of Map functions.
MapReduce framework works closely with Hadoop, however, MapReduce programming paradigm can be used with any programming language.
2.3.4 Hive
The Hive [11] is a data warehouse software combining querying and managing large data sets stored in HDFS. Developers can specify a structure of tables the same way as in RDBM and then query underlying data using SQL-like language HiveQL [14]. Hive gives a developer power to create tables over data in HDFS or over external data sources and specifies how these tables are stored.
Hive metadata are stored in HCatalog. Specifically in Metastore database. Ma-jor advantage of the metastore is that its database can reside outside Hadoop cluster. Such a located metastore database can be used by other services and it prevail in case of cluster failure. One of the most distinguishing feature of Hive is validation of schema on read and not with write like in RDBMs. Due to this behaviour, it is not possible to define referential integrity using foreign keys or even define uniqueness.
Clients can connect to Hive using two different drivers, ODBC and JDBC. ODBC is a standard written in C and C++ and is supported by majority of tools and client tools. JDBC, on the other hand, is based on programming language Java, therefore some technologies, especially the ones from companies not devel-oping on Java, lack native support. However, a lot of tools have separate JDBC drivers that can be installed. For example Microsoft SQL Server have download-able JDBC driver that supports SQL Server 2005 and up. Oracle and MySQL databases have native support for JDBC drivers. Drivers performance is highly dependable on implementation of driver that we use to connect to Hive [15, 16].
Figure 2.3: Hive connection diagram [11].
Tables in Hive can be internal or external. Internal table is a table completely managed by Hadoop. However, an external table can be located elsewhere and then only metadata are stored in Hive metastore. For querying data outside Hadoop Hive uses Storage Handlers. Currently, support for JDBC storage handler is not included in official Hive release, but code can be downloaded and compiled from open-source project [17]. This feature gives Hive ability to query data stored in databases through JDBC drivers. However, external tables cannot be modified from Hive. Among the JDBC driver, Hive supports external tables for HBase, Cassandra, BigTable and others.
Hive uses HiveQL language to query the data [14]. Every query is translated into Java MapReduce jobs first and then executed. In general, Hive has not been build for quick iterative analysis, but mostly for long running jobs. The transla-tion and query distributransla-tion itself takes around 20 seconds to finish. This disad-vantages of Hive in using it in customer facing BI tools such as reporting services, dashboards or in a OLAP with data stored in Hadoop, as every interaction such as a refresh or change of parameters, generate new query to the Hive and forces users to wait at least 20 seconds for any results.
Another Hive feature used in data warehousing and analytics is support for different file types and compressed files. This includes text file, binary sequence file, columnar storage called Parquet or JSON objects. Other feature related to file storage is compression. Compressing files can save significant amount of storage space, therefore decreasing read and write time changes for CPU time. In some cases compression can improve both disk usage and query performance [18]. Text files consisting csv, XML or JSON can be parsed using different SerDe functions, which are serialisation and deserialisation functions. Natively Hive offers sev-eral basic SerDe functions or RegEx SerDe for regular expressions. Open-source libraries for parsing JSON exist, although they are not included in official Hive releases. However, those libraries often have issues with nested JSON or JSON arrays.
In data warehousing data are mostly stored as a time series. Typically every hour, day or week new data are exported from a source system for the particular period. In order to easily append data in Hive tables, Hive supports table parti-tioning. Each partition is defined by a meta column which is not part of a data files. By adding new partition into existing table, all queries automatically query even new partition. Performance wise specific partition can be specified in where clause of HiveQL statement in order to reduce number of redundant reads as a Hive reads only partitions that are needed. This is useful feature in an ETL pro-cess as an ETL usually propro-cess only small set of data. Typically, table would be partitioned by a date or a datetime, depending on a period of data exports.
Hive also supports indices, which are similar to indices in RDBMs. Index is sorted structure that increases read performance. Using the right query condi-tions and index, it is possible to decrease number of reads, as only portion of table or partition needs to be loaded and processed. Hive supports index rebuild function on table or individual partitions.
2.3.4.1 HiveQL
HiveQL is Hive query language that has been developed by Facebook to simplify data querying in Hadoop. As most developers and analysts are used to SQL lan-guage, developing similar language for Hadoop was very reasonable. HiveQL give users SQL-like access to data stored in Hadoop. It does not follow full SQL standard, however the syntax is familiar to SQL. Among others HiveQL supports following features:
• Advanced SQL features such as window functions (e.g. Rank, Lag, Lead) • Querying JSON objects (e.g. using get_json_object() function).
• User defined functions. • Indexes.
2.3.4.2 Hive ETL
This is an example of hand coded ETL in Hive. Browser and country dimensions are not included as they are identical to country. Individual parts of the code are commented.
1 −− PREPARATION
2 −− add t h i r d p a r t y JSON SerDe l i b r a r y
3 add j a r s3 :// e l a s t i c m a p r e d u c e /samples/hive−ads/ l i b s / j s o n s e r d e . j a r ; 4 5 −− s t a g e f o r s t o r i n g t e m p o r a r y d a t a 6 CREATE TABLE s t a g e _ v i s i t _ l o g 7 ( 8 id_dim_email i n t , 9 id_dim_country i n t , 10 id_dim_browser i n t , 11 l o g t i m e timestamp 12 ) ; 13 14 −− f i n a l f a c t t a b l e 15 CREATE TABLE f a c t _ v i s i t _ l o g 16 (
17 id_dim_email i n t COMMENT ’ S u r r o g a t e key t o Email dimension ’ , 18 id_dim_country i n t COMMENT ’ S u r r o g a t e key t o Email dimension ’
,
19 id_dim_browser i n t COMMENT ’ S u r r o g a t e key t o Email dimension ’ ,
20 l o g t i m e timestamp COMMENT ’ Time when u s e r loged i n t o our s e r v i c e ’ 21 ) 22 COMMENT ’ F a c t t a b l e c o n t a i n i n g a l l l o g i n s t o our s e r v i c e ’ 23 PARTITIONED BY( e t l _ t i m e s t a m p timestamp) 24 −− i n d i v i d u a l p a r t i t i o n f o r e a c h e t l i t e r a t i o n 25 STORED AS SEQUENCEFILE 26 −− s t o r i n g a s a s e q u e n c e f i l e t o d e c r e a s e f i l e s i z e 27 ; 28 29 −− f i n a l d i m e n s i o n e m a i l t a b l e
30 CREATE TABLE dim_email
31 (
32 id_dim_email i n t COMMENT ’ S u r r o g a t e key ’ , 33 email s t r i n g COMMENT ’ F u l l email address ’ 34 )
35 COMMENT ’ Email dimension t a b l e ’ 36 STORED AS SEQUENCEFILE
37 −− s t o r i n g a s a s e q u e n c e f i l e t o d e c r e a s e f i l e s i z e
39
40 −− s t a g e e m a i l u n i o n t a b l e
41 CREATE TABLE s t a g e _ e m a i l _ a l l 42 (
43 id_dim_email i n t COMMENT ’ S u r r o g a t e key ’ , 44 email s t r i n g COMMENT ’ F u l l email address ’ 45 )
46 COMMENT ’ Email dimension t a b l e ’ 47 STORED AS SEQUENCEFILE
48 −− s t o r i n g a s a s e q u e n c e f i l e t o d e c r e a s e f i l e s i z e
49 ; 50
51 −− DATA LOADING
52 −− EXAMPLE OF ONE ITERATION
53
54 −− c r e a t e e x t e r n a l t a b l e t o s 3 f o l d e r
55 −− a l l f i l e s i n t h e f o l d e r and u n d e r l y i n g f o l d e r s a r e p a r s e d and q u e r i e d
56 −− a l l o w s s i m p l e ETL j o b r e r u n
57 DROP TABLE I F EXISTS s o u r c e _ v i s i t _ l o g _ 2 0 1 4 1 0 2 5 ; 58 CREATE EXTERNAL TABLE s o u r c e _ v i s i t _ l o g _ 2 0 1 4 1 0 2 5 59 ( 60 email s t r i n g , 61 country s t r i n g , 62 browser s t r i n g , 63 l o g t i m e s t r i n g 64 )
65 row format s e r d e ’com . amazon . e l a s t i c m a p r e d u c e . J s o n S e r d e ’ 66 with s e r d e p r o p e r t i e s ( ’ paths ’ = ’ email , country , browser ,
l o g t i m e ’ )
67 LOCATION ’ s3 :// Incoming−data/ v i s i t−l o g /2014−10−25/ ’ 68 ;
69
70 −− g e t maximum i d f r o m e m a i l d i m e n s i o n
71 −− f i l l d i m e n s i o n e m a i l
72 INSERY OVERWRITE TABLE s t a g e _ e m a i l
73 SELECT
74 ROW_NUMBER( ) OVER ( PARTITION BY email ORDER BY email ) + max_id as id_dim_email
75 , email 76 FROM
77 (
78 SELECT
79 DISTINCT l o g . email as email
80 FROM s o u r c e _ v i s i t _ l o g _ 2 0 1 4 1 0 2 5 l o g
81 LEFT JOIN dim_email em
83 WHERE l o g . email i s n u l l
84 ) dist_em
85 LEFT JOIN
86 (SELECT max( id_dim_email ) as max_id FROM dim_email ) max_id_tab
87 ; 88
89 −− u n i o n s t a g e and d i m e n s i o n d a t a
90 INSERT OVERRIDE s t a g e _ e m a i l _ a l l
91 SELECT id_dim_email , email FROM dim_email
92 UNION ALL
93 SELECT id_dim_email , email FROM s t a g e _ e m a i l 94 ;
95
96 −− s w i t c h s t a g e and d i m e n s i o n t a b l e
97 ALTER TABLE dim_email RENAME TO dim_email_old ;
98 ALTER TABLE s t a g e _ e m a i l _ a l l RENAME TO dim_email ; 99 ALTER TABLE dim_email_old RENAME TO s t a g e _ e m a i l _ a l l ; 100
101 −− p e r f o r m s d i m e n s i o n l o o k u p s and l o a d t r a n s f o r m e d d a t a i n t o s t a g e t a b l e
102 −− a l l o w s s i m p l e ETL j o b r e r u n
103 INSERT OVERWRITE TABLE s t a g e _ v i s i t _ l o g
104 SELECT
105 ISNULL (em . id_dim_email , −1) as id_dim_email 106 , ISNULL ( c n t . id_dim_country , −1) as id_dim_country 107 , ISNULL ( br . id_dim_browser , −1) as id_dim_browser 108 ,CAST( l o g t i m e as timestamp) as l o g t i m e
109 FROM s o u r c e _ v i s i t _ l o g _ 2 0 1 4 1 0 2 5 l o g
110 LEFT JOIN dim_email em
111 ON ISNULL ( l o g . email , ’unknown ’ ) = em . email
112 LEFT JOIN dim_country c n t
113 ON ISNULL ( l o g . country , ’unknown ’ ) = c n t . country
114 LEFT JOIN dim_browser br
115 ON ISNULL ( l o g . browser , ’unknown ’ ) = c n t . browser 116 ;
117
118 −− p a r t i t i o n swap i n t o f a c t t a b l e
119 ALTER TABLE f a c t _ v i s i t _ l o g DROP I F EXITS PARTITION ( e t l _ t i m e s t a m p = ’ 2014−10−25 ’ ) ;
120
121 ALTER TABLE f a c t _ v i s i t _ l o g EXCHANGE PARTITION ( e t l _ t i m e s t a m p = ’ 2014−10−25 ’ ) WITH TABLE s t a g e _ v i s i t _ l o g ;
2.3.5 Impala
Impala is an open source distributed processing framework developed by Cloud-era. Impala provides low-latency SQL queries on data stored in Hadoop. Low-latency is achieved with in-memory computing. Impala stores data in-memory in Parquet columnar format, therefore it more suitable for querying large fact tables with smaller amount of columns. Another feature that distinguish Impala from Hive is a query engine. Imapala does not compile queries into MapReduce rather uses its own programming model.
Impala is closely tied to Hive because it uses Hive metastore for storing data metadata. Impala can run on Hadoop cluster without Hive, however Hive metas-tore has to be installed. Impala accepts queries sent via impala shell, JDBC, ODBC or Hue console.
2.3.6 Pig
Pig [19] is simple scripting language for data processing and transformations. It has been developed mainly by Yahoo. The power of Pig and Hive is similar. The main difference is audience for which the tool was developed. Pig is more intuitive for developers with experience in procedural languages. On the other hand, Hive leverages knowledge of SQL. Pig’s advantages:
• Supports structured, semi-structured and unstructured data sets. • Procedural, simple, easy to learn.
• Supports user defined functions.
One of the powerful features is capability to work with dynamic schema. Meaning that PIG can parse unstructured data such as JSONs without predefined schema. This is helpful while working with dynamic nested JSONs or JSONs ar-rays.
DataJson = LOAD ’s3://pig-test/test-data.json’
using com.twitter.elephantbird.pig.load.JsonLoader(’-nestedLoad’);
Figure 2.4: Example of Pig script for loading complex JSON structure. Usage of Pig or Hive highly depends on audience. As most of the data ware-houses are built on RDBMs and people involved in its development have knowl-edge of SQL, the Hive is preferable tool for data transformation and data query-ing.
2.3.7 Kylin
Relatively new project that implements OLAP into Hadoop is Kylin [20]. Kylin originally started as an in-house project of ebay but it has been published as open-source few months ago.
Kylin works in the following manner. Aggregations are calculated from Hive using HiveQL and stored in HBase. Then when data are queried Kylin query en-gine looks into HBase if requested data are precalculated there and if so, returns data from HBase with sub-second latency. Otherwise Kylin query engine routes query into Hive. Kylin supports JDBC, ODBC and Rest API for client tools, there-fore it is possible to connect from analytic tools such as Tableau, SAS or Excel.
Figure 2.5: Kylin high-level architecture [20].
2.3.8 Sqoop
Sqoop [11] is tool provided by Hadoop that is used for importing and exporting data. Different modes of exporting are supported such as full export, incremental or limiting size of an export using WHERE clause. Data can be exported from HDFS, Hive or any RDBM that supports JDBC. Sqoop works bi-directionally therefore supports importing into HDFS, Hive or RDBM. Data can also be ex-ported into delimited text file with specific field and row terminators or sequence file. As a tool that is a part of distributed system, Sqoop also supports distributed and parallel processing. Sqoop includes additional functionality for Hive export and import in order to simplify data transfers with RDBMs. Sqoop Hive addi-tional support includes:
• Incremental imports.
• CREATE TABLE statements within imports. • Data import into specific table partition. • Data compression.
The following chapters will explain how to integrate Hadoop into enterprise data warehouse including implementation of star schema, data management and physical implementation.
3.1
Star schema implementation
A logical star schema model captures business point of view and it is highly platform independable on physical implementation. Hive provides almost the same SQL-like interface as common RDBMs, therefore the data model build for RDBMs data warehouse can be implemented in Hive with some adjustments. Thesis therefore focuses rather on physical implementation than on logical.
Main advantages of implementing star schema in Hive: • Simple and understandable view on data.
• Easy support for master data management. • Dimensions support user defined hierarchies. • Performance improvement [18].
• Conformed dimensions can be shared across different platforms. 3.1.1 Dimensions implementation
One of the most challenging part of implementing dimension tables are unsup-ported DML functions update and delete at the row level in HIVE. Even though, append operation is available, it creates new file with every append. This can cause significant issues with NameNode performance as amount of small files can grow significantly and it can cause issues with insufficient memory. Also only first type of slowly changing dimension can be implemented without support of update and delete.
Because dimension needs to keep the same surrogate keys and update is not available, it is necessary to recreate the table every ETL run with the same keys. Although, auto increment functionality is not developed yet, other ways exist.
While inserting new rows into table, we need to merge existing dimension data and new data. It is necessary to keep old keys and generate new keys for new records in this step. One of the ways how to generate a sequence is getting max-imal key value from existing data and then use functionUDFRowsequence.javato generate sequence starting with number one and then add maximum key value to all generated keys. The same result can be achieved usingROW_NUMBER()
window function. Due to lack of transactions in Hive it is recommended to stage data from more data sources first and load them into final dimension table at once
or use different techniques to disable possibility of concurrent runs of filling a di-mensional table. Example of filling dimension table is at rows 70-100 in chapter 2.3.4.2.
However, as the Hive does not support primary keys, it is better to develop a job that validates uniqueness of surrogate keys in dimension tables. As a dupli-cation of surrogate keys can lead to a record duplidupli-cation in a fact tables.
If a dimension is conformed or at least shared across different data sources on different platforms, more suitable solution is to generate keys on platform such as RDBM that have auto-increment functionality, supports transactions and ensures uniqueness. Basically RDBM keeps the original dimension table and a Hive has only copy for read. One of the possibilities how to load a dimension using additional RDBM is to take records that should be inserted into dimension and export them into staging table on RDBM. For export from a Hive Sqoop tool can be used.
Figure 3.1: Diagram describing using RDBM to assign surrogate keys.
sqoop import --connect
’jdbc:sqlserver://edw.server.com;database=dw; username=sqoop;password=sqoop’
--table dim_email --hive-import --schema dbo --incremental append --check-column id_dim_email --last-value 500
--hive-import
Figure 3.2: Sqoop imports all records from dim_email with id_d_email greater than 500.
Loading into dimension in RDBM runs in a transaction, therefore concurrent inserts are excluded. When dimension is loaded it can be exported back to Hive. Depending on size of the table, incremental of full export can be chosen. This export needs to be performed after each insertion into dimension as Hive has only copy and it needs to be synchronized with the original.
Regarding slowly changing dimensions, they can be implemented the same way as in RDBM. First type uses two additional columns of date or timestamp format which specifies time period in which column is valid and second type can be implemented by using additional columns of previous values [8].
If data are in JSON or different unstructured format it is recommended to split JSON into structured table. Even though JSON record can contain dynamic arrays, it is important to decide if the data in such a dynamic structure are neces-sary. Splitting JSON into structured table has four main advantages:
• Performance.
• Simple synchronization with RDMB.
• Dimensions are simple and understandable for business users.
• Simple usage of dimensions in additional tools such as ETL and OLAP. Otherwise JSONs need to be parsed with every query and JSON parsing is an expensive operation [21].
3.1.2 Facts implementation
Fact tables contain snapshot of data or log data. Naturally, data stored in data warehouse are tied to a day snapshot was taken or event occurred. This creates time series defined mostly by time period between individual data exports (such as hourly, daily, weekly).
Crucial to a fact table implementation is ability to effectively work with indi-vidual dates as well as with whole history. Due to limited DML operations, few possible options are available.
First, the least efficient way is to load new data into the same not partitioned fact table. This causes read and write of whole table as delete operation is not supported. Delete operation is needed to ensure ability to rerun an ETL job. When incorrect data are loaded into a fact table, some easy way how to fix the issue and rerun ETL process has to exist. This is usually achieved by deleting data from current ETL iteration.
The second option is to create individual tables for each hour, day or week depending on how often an ETL iteration runs. Using separate tables and then creating view we can query individual tables as well as the whole history.
The most effective option is to use partitioning that is available in Hive. Each ETL iteration works with one partition that is then appended into existing table. A partition in Hive provides similar benefits as clustered index. Partition is defined by an ETL iteration ID, which can be timestamp or int. Using partitioning it is possible to query whole table or filter data by partition attribute and then only specific partitions of the table are read. Example of partitioned fact table is at rows 103-121 in chapter 2.3.4.2. It uses partition swap to move data into fact table more efficiently as it only affects metadata.
In case of unstructured data such a JSON the same rule applies as for dimen-sions. It is better to split the data for easier querying, data understanding and accessing via ETL and BI tools and better performance. Surrogate keys can be substituted for natural keys by a dimension lookup using join operation.
3.1.3 Star schema performance optimization
Several ways how to improve star schema performance exist. However, at the end performance highly depends on specific Hadoop cluster and data charac-teristics [18]. Generally, queries to data warehouse causes lot of reads and need computational power as a large fact table is being joined to several dimensional tables. Implementing start schema improves performance by reducing a size of fact tables and reduces query time for most of the queries [18].
Performance can be improved by using sequence files. Sequence file is a flat file format consisting key-value pairs. A sequence file can be compressed decreas-ing file size and afterwards it is split into blocks on HDFS, hence improvdecreas-ing work-load. Sequence file can also be stored as binary files, therefore decreasing size of the files on disk and improving read performance [18, 22]. Additional format for consideration is Parquet. Parquet is columnar storage for Hadoop ecosystem. Columnar storage is the most beneficial if fact table is used in aggregation of large amount of data and for queries that loads lots of rows but use a limited number of columns. Example of table stored as sequence file is at rows 14-27 in chapter 2.3.4.2.
For dimensions that are small enough to fit into memory, Hive have Map join functionality [21]. Map joins are processed by loading the smaller table into an in-memory hash map and matching keys with the larger table as they are streamed through. Another similar approach is to use Hadoop distributed cache [11].
Hive can benefit from building indexes on fact tables. However, due to Hive difference to RDBMs, indexes does not have to be beneficial as in RDBMs.
For further queries optimization, Hive supports function EXPLAINused for explanation of generated MapReduce execution plan.
3.2
Security
By default Hadoop runs in non-secure mode in which no actual authentication is required. By configuring Hadoop to run in secure mode, each user and service needs to be authenticated by Kerberos in order to be able to use Hadoop services [11].
Organizations which already have an Active Directory to manage user ac-counts, are not keen in managing another set of user accounts separately in MIT Kerberos. Nonetheless, Kerberos can be configured to connect to a company LDAP. On the other hand each user has to have principal created and then it is necessary to link principals to users in active directory.
HDFS, resource-level access control for YARN and MapReduce, and coarser-grained access control at a service level. Hive provides Grant and Revoke access control on tables similarly to RDBMs.
3.3
Data management
3.3.1 Master dataAs an example, it is typically the case in a Master data that the attribute data names and data definitions used to describe master data entities are likely to be the standard data names and data definitions for the whole enterprise. Master data are closely tied to conformed dimensions in data warehouse as some of the conformed dimensions are based on enterprise master data.
It is important to be able to match new records with already existing in con-formed dimensions in data warehouse. There is more approaches how to match dimensional records:
• Simple matching - Using Group by or Distinct statement and then per-forming lookup on dimensional table. This is easy to implement using Hive as both operations are supported. This approach supports auditabil-ity natively as we know how exactly records are matched. Example of simple matching is at rows 70-87 in chapter 2.3.4.2.
• Complex matching - Often implemented in commercial Master data man-agement systems. Usually complex matching algorithms that are hard to scale or impossible to distribute. One of the disciplines used is machine learning. It can be used to predict matching. One of the Hadoop libraries that focus on machine learning implementation is Mahout. However, us-ing complex matchus-ing significantly increase ETL development complex-ity and auditabilcomplex-ity [23].
In already existing data warehouse it can be efficient to use existing master data management system. However, loading a lot of data from Hadoop to an existing system can easily become a bottleneck of whole ETL process. While considering this option it is important to transfer the smallest possible amount of data and keep as much of data processing in Hadoop as possible.
3.3.2 Metadata
Metadata contains detailed information about individual steps in an ETL process, reporting or in data sources. With lacking metadata information it is difficult to keep a track of data flow and data transformation.
Hadoop does not contain any tool that directly supports metadata collection and management. Therefore metadata repository and metadata collection have to be implemented manually.
Data metadata can be stored in Hive metastore using HiveQL features TBL-PROPERTIES, which allows to define key value pair of commentaries and COM-MENTS, which can be added to columns or table [11]. With regularly ran queries such as ETL planning problem occurs. Commercial tools such as Informatica sup-ports ETL documentation and metadata creation, but if ETL is hand coded in Hive it is necessary to store metadata outside of Hive metastore as Hive does not support stored procedures. Design in orchestration chapter can be expanded for metadata collection process which can be automated, based on query parsing. Example of Hive table metadata is at rows 14-27 in chapter 2.3.4.2.
Metadata stored in database can simply be visualised and displayed using any reporting tool. Reporting tools usually have their own system for storing metadata.
Therefore metadata for individual parts of data warehouse can be stored as: • Table details - stored in Hive table properties.
• Procedures details and query dependencies - manually created data struc-ture for storing queries in RDBM.
• ETL orchestration, data flow and orchestration - orchestration tool. • Reporting and BI tools - their own databases.
3.3.3 Orchestration
One of the things Hadoop does not do well at this moment is an orchestration. An orchestration is a central function of whole data warehouse environment and the source of operational information. EDW environment can have several thousands of unique jobs running every day. With near real time and real time processing the number of jobs ran every day can be tens of thousands. In such an environment it is crucial to have ability to define individual tasks, its dependencies and runtime environment. What is required from EDW orchestration:
• Agents support - EDW usually use more servers, different environments such as development, production or UAT and it is important to easily change job to run on different environments and servers.
• Complex dependencies support - Dependencies of individual jobs need to be set properly. Often job waits for other projects to finish or just one job in a project. Ability to set dependencies correctly can significantly de-crease wait times of EDW processes. Dependencies also secure correctness of data.
• Simple operability - With thousands of jobs running every day, simple operability of orchestration tools can secure quicker problem resolution time and meeting SLA.
• Simple manageability - EDW processes planning should not take signifi-cant amount of time as they can often change.
For orchestration purposes Hadoop offers tool Oozie, which is simple Hadoop job scheduler. Considering requirements on orchestration tool previously men-tioned, Oozie itself does not meet most of them. The main problem is accessi-bility only through command line interface or simple console and job definition using XML. Therefore any job definition change has to be made via CLI and job listing in console does not provide enough information to simplify operational task. Another disadvantage is lack of support of non-Hadoop technologies. In general, it is better to use general commercial scheduling systems such as Cisco Tidal1 or Control-M2. Especially if data warehouse is already running and some
scheduling systems are used. Running automated queries on Hive is a matter of connecting to Hadoop master node via ssh tunnel or directly into Hive using one of the supported drivers. This can be easily implemented in most scripting languages and then planned into any job scheduler.
To make Oozie more suitable tool, an open source project has started called Cloumon Oozie [24], which aims to implement a job definition and workflow designer and a job management system.
3.3.4 Data profiling
Data profiling [25]is process of examining a data in an available data sets and collecting information and statistics about a data. Main goal is to discover data characteristics and whether data can be used for other purposes.
Data profiling consist different kinds of basic statistics such as minimum, maximum, frequency, standard deviation and other information about aggrega-tion such as count or sum. Addiaggrega-tional informaaggrega-tion about data attributes can con-sist of data type, uniqueness or length. [26]
Data profiling in Hadoop can be performed by:
• Commercial data profiling tool such as SAS or Informatica. • Using R or other statistical language.
• Native Hadoop tool such as Hive or Pig.
Having Hive already running on Hadoop, using Hive and HiveQL for data profiling is the easiest choice. HiveQL supports all basic statistical functions e.g. min, max, avg as SQL and also supports aggregation functions such as count, sum. Hive also contains variety of build-in user defined functions [27] that calcu-late standard deviation, Pearson coefficient, percentiles and many others.
Therefore after setting the basic table structure in Hive, data profiling can be done using basic build-in functions. The process of data profiling can be easily 1. http://www.cisco.com/c/en/us/products/cloud-systems-management/tidal-enterprise-scheduler/index.html
automated and used as a basic indicator of data quality and occurring data issues. However, some tasks of data profiling can be difficult to implement in HiveQL as they are difficult to implement even in SQL.
3.3.5 Data quality
Cleansing data and conforming data quality [28] is a challenging process espe-cially in Hadoop. Commonly, lots of basic data quality validations in RDBMs are forced by table structure and constraints and are validated with data insertion into a table. Those basic constraints are data types, null and not null or that value can be greater, equal and lesser than predefined value. More complex constraints can be defined as well such as uniqueness, foreign key or functional dependencies on other attributes. However, constraints such as foreign keys can significantly decrease ETL performance.
Nonetheless, Hadoop has data structure validation implemented on read op-eration instead of write.
With Hive, all these constraints have to be validated during transformations and while loading data into star schema. This can significantly affect ETL perfor-mance on Hadoop, especially with big log fact tables or dimensions. Therefore it is better to identify important data quality validations and issues and consider in-tegrating more data quality validations into one query to reduce number of read operations from disks.
Data that are loaded into conformed dimension or used for sensitive business reporting such as accounting information or information that are afterwards pub-licly published should have higher priority and should be validated very care-fully.
Many different methods can be implement in Hive into an ETL process that can regularly validate quality of data. One of these methods that are easily imple-mentable with HiveQL are Bollinger bands [29]. Bollinger bands can automat-ically validate data distribution and capture unexpected distribution changes. Other methods are implemented in build-in Hive UDFs such as standard devia-tion.
At this point there are no open source data quality tools that would support Hadoop or Hive. On the other hand, commercial tools such as Informatica are slowly implementing Hadoop support allowing to manage Hadoop and RDBMs data quality with one tool. Other commercial tools from companies such as Tal-end or Attacama focus only on Hadoop.
3.3.6 Data archiving and preservation
One of the part of data management is a plan that describes when and how data are archived and what data are preserved in data warehouse. One of the major issue of RDBMs data warehouse is expensive storage space, therefore historic data are either aggregated or archived and deleted from data warehouse. When data are moved from data warehouse they are usually moved to tapes as to a