Calhoun: The NPS Institutional Archive
Theses and Dissertations Thesis Collection
2014-09
Adding big data analytics to GCSS-MC
Bitto, Nicholas
Monterey, California: Naval Postgraduate School
NAVAL
POSTGRADUATE
SCHOOL
MONTEREY, CALIFORNIA
THESIS
ADDING BIG DATA ANALYTICS TO GCSS-MC by
Nicholas Bitto September 2014
Thesis Co-Advisors: Arijit Das Gurminder Singh
REPORT DOCUMENTATION PAGE Form Approved OMB No. 0704–0188
Public reporting burden for this collection of information is estimated to average 1 hour per response, including the time for reviewing instruction, searching existing data sources, gathering and maintaining the data needed, and completing and reviewing the collection of information. Send comments regarding this burden estimate or any other aspect of this collection of information, including suggestions for reducing this burden to Washington headquarters Services, Directorate for Information Operations and Reports, 1215 Jefferson Davis Highway, Suite 1204, Arlington, VA 22202–4302, and to the Office of Management and Budget, Paperwork Reduction Project (0704-0188) Washington DC 20503.
1. AGENCY USE ONLY (Leave Blank) 2. REPORT DATE 09-30-2014
3. REPORT TYPE AND DATES COVERED Master’s Thesis 03-25-2013 to 09-30-2014 4. TITLE AND SUBTITLE
ADDING BIG DATA ANALYTICS TO GCSS-MC
5. FUNDING NUMBERS
6. AUTHOR(S) Nicholas Bitto
7. PERFORMING ORGANIZATION NAME(S) AND ADDRESS(ES)
Naval Postgraduate School Monterey, CA 93943
8. PERFORMING ORGANIZATION REPORT NUMBER
9. SPONSORING / MONITORING AGENCY NAME(S) AND ADDRESS(ES)
N/A
10. SPONSORING / MONITORING AGENCY REPORT NUMBER
11. SUPPLEMENTARY NOTES
The views expressed in this document are those of the author and do not reflect the official policy or position of the Department of Defense or the U.S. Government. IRB Protocol Number: N/A.
12a. DISTRIBUTION / AVAILABILITY STATEMENT Approved for public release; distribution is unlimited
12b. DISTRIBUTION CODE
13. ABSTRACT (maximum 200 words)
Global Combat Support System - Marine Corp is a large logistics system designed to replace numerous legacy systems used by the Marine Corps. While it has been in existence for a while, its intended potential has not been fully realized. Therefore, various teams are working hard to develop the analytics that will benefit the community. With the growth of data, the only way these analytics (in Structured Query Language [SQL]) will run efficiently will be on proprietary hardware from Oracle. This research looks at running the same analytics on commodity hardware using Hadoop Distributed File System and Java Map Reduce. The results show that while it takes longer to program in Java (over SQL), the analytics are just as, or even more powerful ,as SQL, and the potential to save on hardware cost is significant.
14. SUBJECT TERMS
Big Data, Hadoop, MapReduce, GCSS-MC
15. NUMBER OF PAGES 93 16. PRICE CODE 17. SECURITY CLASSIFICATION OF REPORT Unclassified 18. SECURITY CLASSIFICATION OF THIS PAGE Unclassified 19. SECURITY CLASSIFICATION OF ABSTRACT Unclassified 20. LIMITATION OF ABSTRACT UU
NSN 7540-01-280-5500 Standard Form 298 (Rev. 2–89)
Approved for public release; distribution is unlimited
ADDING BIG DATA ANALYTICS TO GCSS-MC
Nicholas Bitto
Lieutenant, United States Navy B.S. , Old Dominion University, 2009
Submitted in partial fulfillment of the requirements for the degree of
MASTER OF SCIENCE IN COMPUTER SCIENCE from the
NAVAL POSTGRADUATE SCHOOL September 2014
Author: Nicholas Bitto Approved by: Arijit Das
Thesis Co-Advisor Gurminder Singh Ph.D. Thesis Co-Advisor Peter J. Denning Ph.D.
ABSTRACT
Global Combat Support System - Marine Corp is a large logistics system designed to re-place numerous legacy systems used by the Marine Corps. While it has been in existence for a while, its intended potential has not been fully realized. Therefore, various teams are working hard to develop the analytics that will benefit the community. With the growth of data, the only way these analytics (in Structured Query Language [SQL]) will run ef-ficiently will be on proprietary hardware from Oracle. This research looks at running the same analytics on commodity hardware using Hadoop Distributed File System and Java Map Reduce. The results show that while it takes longer to program in Java (over SQL), the analytics are just as, or even more powerful ,as SQL, and the potential to save on hard-ware cost is significant.
Table of Contents
1 Introduction 1 1.1 The Problem . . . 1 1.2 Research Questions . . . 2 1.3 Contributions . . . 2 1.4 Thesis Outline . . . 22 The Current State 5 2.1 GCSS-MC: The USMC ERP Solution . . . 6
2.2 Big Data. . . 7
2.3 MapReduce . . . 8
2.4 Word Count . . . 10
2.5 Hadoop . . . 19
3 The Experiment Design 23 3.1 Why Add a Big Data Element? . . . 23
3.2 Adding a Big Data Element to GCSS-MC . . . 24
3.3 Building a Hadoop Cluster . . . 25
4 The Experiment 29 4.1 The GCSS-MC Data . . . 29 4.2 Top-100-NSN Program . . . 32 4.3 SQL Simulation . . . 45 5 Conclusion 57 5.1 The Outcome . . . 57 5.2 Future Work . . . 59
Appendices
A Hadoop Testing on Single and Two Node Clusters 61
A.1 Hypothesis . . . 62
A.2 Experiment Architecture . . . 62
A.3 Experiments Run . . . 63
A.4 Results . . . 64
A.5 Conclusion . . . 65
B How to Setup a Single Node Hadoop Cluster 67
References 73
List of Figures
Figure 2.1 GCSS-MC Architecture . . . 7
Figure 2.2 Google Execution Overview . . . 10
Figure 2.3 Hadoop WordCount Program . . . 11
Figure 2.4 Hadoop WordCount Program: Libraries . . . 12
Figure 2.5 Hadoop WordCount Program: Map Class . . . 13
Figure 2.6 Hadoop WordCount Program: Reduce Class . . . 14
Figure 2.7 Hadoop WordCount Program: Main Method . . . 15
Figure 2.8 Jack and Jill Nursery Rhyme . . . 16
Figure 2.9 Byte Offset for Jack and Jill Nursery Rhyme . . . 16
Figure 2.10 Intermediate Key/Value Pairs from the Map Method . . . 17
Figure 2.11 Shuffled Intermediate Key/Value Pairs from the Map Method . . 18
Figure 2.12 Final Output of WordCount . . . 19
Figure 2.13 HDFS Architecture . . . 20
Figure 2.14 HDFS JobTracker Interaction . . . 22
Figure 3.1 Experiment Architecture . . . 24
Figure 4.1 Sample GCSS-MC data in JSON Format . . . 30
Figure 4.2 Data Flow through the NSN Program . . . 34
Figure 4.3 NSN: First Algorithm . . . 35
Figure 4.4 NSN Output from the First Algorithm . . . 36
Figure 4.5 NSN: Second Algorithm . . . 37
Figure 4.7 NSN Output from the Second Algorithm . . . 38
Figure 4.8 NSN: Third Algorithm . . . 39
Figure 4.9 NSN Output from the Third Algorithm . . . 40
Figure 4.10 NSN: Fourth Algorithm . . . 41
Figure 4.11 NSN Output from the Fourth Algorithm . . . 41
Figure 4.12 NSN: Fifth Algorithm . . . 43
Figure 4.13 NSN Output from the Fifth Algorithm . . . 44
Figure 4.14 Alpha Program Logic . . . 45
Figure 4.15 Data Flow Through the Alpha Program . . . 46
Figure 4.16 Alpha: First Algorithm . . . 48
Figure 4.17 Alpha Output from the First Algorithm . . . 49
Figure 4.18 Alpha: Second Algorithm . . . 50
Figure 4.19 Alpha Output from the Second Algorithm . . . 51
Figure 4.25 Alpha Output from the Fifth Algorithm . . . 51
Figure 4.20 Alpha: Third Algorithm . . . 52
Figure 4.21 Alpha Output from the Third Algorithm . . . 53
Figure 4.22 Alpha: Fourth Algorithm . . . 54
Figure 4.23 Alpha Output from the Fourth Algorithm . . . 54
Figure 4.24 Alpha: Fifth Algorithm . . . 55
Figure A.1 Experiment Architecture . . . 63
Figure A.2 Write Benchmark Results . . . 64
Figure A.3 Read Benchmark Results . . . 65
List of Tables
Table 3.1 Experiment Server Specifications . . . 26
Table 3.2 Experiment HDFS Node Specifications . . . 26
Table 4.1 Tables Written Back to Database . . . 42
Table 4.2 Sample of HDFS_R001_INVENTORY_NSN Table . . . 44
Table 4.3 Sample of HDFS_ALPHA Table . . . 53
List of Acronyms and Abbreviations
COTS commercial off the shelf CSV Comma Separated Value DOD Department of Defense ERP enterprise resource planning GAO Government Accountability Office GB gigabyte
GCSS-MC Global Combat Support System - Marine Corp GFS Google File System
HD hard drive
HDFS Hadoop Distributed File System IDC International Data Corporation IP Internet Protocol
JDBC Java Database Connectivity JSON JavaScript Object Notation NSN National Stock Number PB petabyte
PC personnel computer
PL/SQL Procedural Language/Structured Query Language RAM random access memory
TB terabyte
USMC United States Marine Corp VM virtual machine
YB yottabyte ZB zettabyte
Acknowledgements
Foremost, I would like to express my sincere gratitude to my advisors Arijit Das and Gur-minder Singh for the continuous support of my research. Their guidance and patience helped me tremendously during the creation, research, and writing of this thesis. I could not have imagined having better advisors for my thesis.
I would also like to thank my family: my wife, Laura Bitto, for taking care of our daughter during all of the late nights and weekends that went into completing this thesis.
CHAPTER 1:
Introduction
Big data has become a buzz word in both the government and private sectors. That does not mean that big data is not a worthwhile venture. There is no hiding from the fact that data continues to grow, and processing enormous amounts of data using conventional tech-niques can become unmanageable. For example, the Pentagon is attempting to expand its worldwide communications network to handle yottabyte (YB)s (1024bytes) of data (A YB one trillion terabyte (TB)s) [1]. As the data increases to new thresholds, current database architectures struggle to keep up. This thesis examines the Global Combat Support System - Marine Corp (GCSS-MC) database and how to apply a big data solution.
1.1
The Problem
The amount of data being stored in database is increasing both in the government and pub-lic sectors. The United States Marine Corp (USMC) is no different, and the need to access, store, and process large amounts of data exists. The USMC is currently working on an enterprise resource planning (ERP) system that will replace all of its legacy logistics sys-tems. The solution that has been developed is called GCSS-MC. GCSS-MC is a complex undertaking and has seen its share of problems during development. The system is making significant achievements and is becoming a useful resource to the USMC. However, the design of this system has taken a significant amount of time to implement while the data continues to grow.
Initially, the GCSS-MC did not consider a big data element in the system. But as the amount of data and desire to keep data is increasing, the USMC needs to find ways to add a big data solution to GCSS-MC. This thesis will explore a method to implement a big data element into GCSS-MC, by adding a Hadoop Distributed File System (HDFS) cluster. Furthermore, a method will be discussed to show no hardware changes will need to be made to the existing GCSS-MC architecture.
1.2
Research Questions
The research and work of this thesis will be focused on addressing the following questions: • What would an architecture look like that adds a big data element to GCSS-MC? • If an architecture can be developed, what modifications would the GCSS-MC
archi-tecture need?
• How can the data contained in GCSS-MC be imported into HDFS? • What type of analytics can Hadoop provide for GCSS-MC data? • How will the data get back to the GCSS-MC database?
1.3
Contributions
The GCSS-MC database contains structured data and is stored in a relational database. This data will be accessed and then parsed and stored in the HDFS ecosystem. The data will then be used in HDFS to run analytics for the GCSS-MC system. The interesting idea in that concept is that HDFS will be used to examine all of the data in the corpus and then perform some calculations and return the data in a different format to the GCSS-MC database. This becomes particularly useful when there is only a small amount of data that needs to be derived and displayed from the entire data corpus. This is illustrated in two separate analytic programs. The programs are written to show that any analytic that can be run in a typical database operation can be accomplished in HDFS. Furthermore, HDFS can be used to perform the exact same queries that a Structured Query Language (SQL) statement can perform. This is increasingly interesting when the data exceeds the capability of standard database designs.
1.4
Thesis Outline
The rest of the chapters in this thesis will further expand on the aforementioned ideas and research questions. Below is a brief outline of what the reader can expect to find in each chapter.
Chapter 2 serves as a starting point for the technology covered in this thesis. The chapter reviews the current architecture and status of the GCSS-MC system. The chapter also discusses the start of the MapReduce paradigm through the introduction of Jeffrey Dean and Sanjay Ghemawat’s paper on the MapReduce. Furthermore, an explanation and walk
through of the canonical WordCount example is given. The chapter wraps up with an introduction of the HDFS architecture.
The experiment architecture is introduced in Chapter 3. The chapter begins with some statistics to further illustrate the need for big data solutions. The chapter then outlines how a big data element could be added to the current GCSS-MC architecture. Finally, it explains how the Hadoop cluster is created, and some implementation decisions that were required. Chapter 4 details the specifics of the experiment. First, there is a discussion of the GCSS-MC data and how it was accessed. Then the chapter delineates how the data is parsed and stored in the Hadoop ecosystem. There are also two separate analytics dis-cussed in Chapter 4. The first analytic is the Top 100 National Stock Number (NSN) program which will be shown in detail. The second analytic is a program called Alpha, an example of SQL simulation. Alpha is also discussed and shown in detail.
The final chapter summarizes the findings and reiterates the research questions that are presented in the thesis. Chapter 5 also discusses some of the possible future work. Such as: adding big data tools (Hive, Squoop, Hbase, etc.), running the experiment code on a production level HDFS ecosystem, and further optimizing the code.
CHAPTER 2:
The Current State
The Department of Defense (DOD) creates, monitors, and views petabyte (PB)s of data every day. The current data management software and hardware make it very difficult to manage, organize, and parse large datasets. Moreover, the data resides across several databases. This infrastructure makes it difficult, or even impossible, for a user who needs to aggregate data from several databases to make an intelligent deductions on that data. The DOD has significant work remaining to streamline all of their data into one space. There are several projects working on a solution to place all of the data in a cloud environment that allows all users to access the data payload. These programs vary greatly across the DOD and no front-runner sets the standard. Even before the DOD can start using the cloud as a large data store, several steps need to take place to get the DOD ready to move to a cloud environment.
The DOD took on the challenge of bringing all of the legacy business systems into a co-hesive system in the 1990s. DOD’s business systems are information systems, including financial and non-financial systems that support DOD business operations, such as civil-ian personnel, finance, health, logistics, military personnel, procurement, and transporta-tion [2]. This process has become known as the ERP and entails an automated system using commercial off the shelf (COTS) software consisting of multiple, integrated func-tional modules that perform a variety of business related tasks such as general ledger ac-counting, payroll, and supply chain management [2]. ERP processes across the DOD have been widely publicized for being behind schedule and cost overruns (because of the size, complexity, and significance to the DOD). ERP has been placed on the Government Ac-countability Office (GAO) high-risk list [2].
In total, there are nine ERP solutions being developed for the DOD. All of these systems hope to replace over 500 legacy systems. Replacing these systems in to aggregate systems data will save the DOD money and give the end user a more powerful system that they can use to help manage their work and use to run analytics that can possibly help them improve their work methods. The ERP solution is an ongoing process, that recently has
put in significant efforts towards a successful project. The USMC solution for ERP is the GCSS-MC. The GCSS-MC will replace legacy systems and tailor a solution that is useful to both the Marine in the field and the Marine shipping the supplies.
2.1
GCSS-MC: The USMC ERP Solution
The GCSS-MC system was designed to run off of COTS architecture and combine all of the USMC legacy business systems in to one system. This process has been ongoing since November 2003, and it was intended to deliver integrated functionality across the logistics areas. [3] This process has been incremented slowly through the delivery of COTS archi-tecture to bring all of the USMC logistic tools to one place. Figure 2.1 is an example of the system architecture, from [4]. The program has had its setbacks and successes. The overwhelming process of replacing up to forty-eight legacy systems is not easily accom-plished [3]. This system is attempting to design a replacement for all legacy systems in one place, which makes the GCSS-MC system incredibly complex. The system has to be designed to meet the needs of the USMC both at the garrison and in the field, all while maintaining real time data updates over the entire system. One can see that this is a dif-ficult undertaking and should not be taken lightly. The point of this thesis is not to take on the challenge of what should or should not have been done either during design or the implementation of GCSS-MC. Moreover, this thesis does not aim to replace and change the current architecture of the GCSS-MC system. The main aim of this thesis is to show a proof of concept that big data analytics can be added to the current GCSS-MC architecture. GCSS-MC uses proven Oracle technology to give the users a reliable relational database. The work on GCSS-MC is not done and its implementation process continues, but does not yet incorporate a big data element. Big Data elements would allow the USMC the ability to store and analyze more data. Currently the only data that GCSS-MC addresses is structured data.
Figure 2.1: GCSS-MC Architecture
2.2
Big Data
Big data has become a buzz word both in industry and the government sector; evidence of this is shown when President Obama started a Big Data Research and Development Initiative in March 2012 [5]. It has helped shift how data is thought about and what can be done with data. Big data does not answer all questions and there are limitations to what it can do. Big data can take the form of MapReduce, NoSQL, Big Table, or something completely different. The Big data industry changes rapidly, and new tools and architecture are constantly being introduced. These tools all provide some benefit and some drawbacks. No one technology provides everything an end user would want. There is currently no defacto standard and the arguments exists as to what technology is the best. The simple answer is that they all provide something, and it really depends on what type of data, how much data, and what the desired analytics are.
2.3
MapReduce
One of the major core technologies for big data is MapReduce. This process is used heav-ily at Google and in Apache’s HDFS system [6], [7]. Google’s development in the Map Reduce space is largely what pushed the open source community to answer with HDFS [8]. Google’s efforts were documented in Jeffrey Dean and Sanjay Ghemawat’s paper on the MapReduce paradigm [6]. This paper is what gave the data science community a whole a look at how Google operated their computer cluster to perform a large number of tasks. The Google MapReduce paper was the first publication illustrating a clustered MapReduce framework. Since this paper has been published several other models have been developed Google’s model. Apache Hadoop is an example of cluster computing environment created after this paper was published. This paper is accepted as a seminal paper and has been cited in thousands of other publications.
MapReduce is a programming model that was developed by Google as an answer to how to process and create large amounts of data. At Google, MapReduce jobs are run every-day and they process more than twenty PBs of data per data [6]. The programming model was developed to abstract the complexity of cluster computing away from the programmer. Parallel computing is extremely difficult, and if the programmer has to handle how to par-allelize the computation, how the system distributes the data around the cluster, failures of cluster nodes, and as well as the data, the task can be overwhelming. Google developed a system and a programming model that abstracts that from the user. The programmer can then write code that can be executed on the cluster though the Google File System [9]. The program then only has to deal with writing the code to handle the calculation and the distribution of the data; how it is parallelized is all handled by the underlying system. This allows the programmer to focus on the algorithms and not specify the parallel features. Google found that most of their data computations involved mapping the data to a compu-tation and then storing the resultant key/value pairs. Google further took their inspiration from the map and reduce primitives that were available in Lisp [6]. Furthermore, this led to the development of the MapReduce programming model. The programmer will specify both the map and reduce function to handle the data in key/value pairs. The map function will first take the data and map it to a logical record of intermediate key/value pairs. Then the intermediate key/value pairs are sent to the reduce function to apply the same function
to all of the values that are the same. The canonical example of this is the word count problem that will be further discussed in Section 2.4.
Google focused on implementing this cluster on a large number of commodity personnel computer (PC)s with a Gigabit Ethernet network [9]. This allowed them to quickly and cheaply build a large scale cluster. This idea of using commodity PCs is important be-cause prior to this framework being developed, the conventional thinking was to have a super computing environment which needed very powerful, expensive servers. The fact that Google showed that the computational power, storage, and speed could be mimicked with a bunch of PCs allowed the parallel computing community to expand into other areas and make super computing more achievable to more people [10]. Google’s implementation also has thousands of machines, and with that many machines failures are common. Since failures are common, Google’s framework supports fault tolerance and allows a machine to be replaced, and since the machines are PCs, the cost to replace them is significantly lower than a server.
The Google framework executes [6] the MapReduce program by first splitting the input files into pieces, these pieces typically range from 16-64MB per piece. These are then sent to worker nodes, and a map function is then started up on each worker node that has been assigned a data split. One of the worker nodes is designated as the master. The master is responsible for designating the worker nodes for both the map and reduce functions. he master will attempt to assign the tasks to idle nodes in the cluster. Each worker that is assigned a data input split will perform a map function on the input data and will parse the input data into intermediate key/value pairs. The values are written to the local disk through a buffer. The location of the intermediate values is then sent to the master. This is needed so the master knows where to tell the reduce workers where their input data resides. A reduce worker node will then make a call to get the data from the location passed to it by the master. The reduce node will read all of the data into its partition, and it will sort the intermediate data be the key value. The reduce node will then iterate over the data for each unique key value and send that to the reduce function specified by the program. The reduce node appends its output to the program specified output file. When all of the map and reduce tasks report completion to the master, the master will wake up the program and the MapReduce call is returned back to that program. A pictorial example of this process
is shown in Figure 2.2, from [6].
Figure 2.2: Google Execution Overview
2.4
Word Count
The MapReduce programming paradigm can be used to process all kinds of data and can become very convoluted. The widely accepted "Hello World" of MapReduce is the word count program. This example has been used by Google [6] and is also used as an example by Hadoop [11]. Figure 2.3 is the source code for the WordCount program form the Hadoop web page [11]. This is the example code that we will step through to example the MapReduce process.
1 p a c k a g e o r g . myorg ; 2 3 i m p o r t j a v a . i o . I O E x c e p t i o n ; 4 i m p o r t j a v a . u t i l . ∗ ; 5 6 i m p o r t o r g . a p a c h e . h a d o o p . f s . P a t h ; 7 i m p o r t o r g . a p a c h e . h a d o o p . c o n f . ∗ ; 8 i m p o r t o r g . a p a c h e . h a d o o p . i o . ∗ ; 9 i m p o r t o r g . a p a c h e . h a d o o p . mapred . ∗ ; 10 i m p o r t o r g . a p a c h e . h a d o o p . u t i l . ∗ ; 11 12 p u b l i c c l a s s WordCount { 13
14 p u b l i c s t a t i c c l a s s Map e x t e n d s MapReduceBase i m p l e m e n t s Mapper < L o n g W r i t a b l e , T e x t , 15 T e x t , I n t W r i t a b l e > { 16 p r i v a t e f i n a l s t a t i c I n t W r i t a b l e one = new I n t W r i t a b l e ( 1 ) ; 17 p r i v a t e T e x t word = new T e x t ( ) ; 18 19 p u b l i c v o i d map ( L o n g W r i t a b l e key , T e x t v a l u e , O u t p u t C o l l e c t o r < T e x t , I n t W r i t a b l e > o u t p u t , 20 R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n { 21 S t r i n g l i n e = v a l u e . t o S t r i n g ( ) ; 22 S t r i n g T o k e n i z e r t o k e n i z e r =new S t r i n g T o k e n i z e r ( l i n e ) ; 23 w h i l e ( t o k e n i z e r . h a s M o r e T o k e n s ( ) ) { 24 word . s e t ( t o k e n i z e r . n e x t T o k e n ( ) ) ; 25 o u t p u t . c o l l e c t ( word , one ) ; 26 } 27 } 28 } 29 30 p u b l i c s t a t i c c l a s s Reduce e x t e n d s MapReduceBase i m p l e m e n t s R e d u c e r < T e x t , I n t W r i t a b l e , 31 T e x t , I n t W r i t a b l e >{ 32 p u b l i c v o i d r e d u c e ( T e x t key , I t e r a t o r < I n t W r i t a b l e > v a l u e s , O u t p u t C o l l e c t o r < T e x t , 33 I n t W r i t a b l e > o u t p u t , R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n { 34 i n t sum = 0 ; 35 w h i l e ( v a l u e s . h a s N e x t ( ) ) { 36 sum += v a l u e s . n e x t ( ) . g e t ( ) ; 37 }
38 o u t p u t . c o l l e c t ( key , new I n t W r i t a b l e ( sum ) ) ;
39 } 40 } 41 42 p u b l i c s t a t i c v o i d main ( S t r i n g [ ] a r g s ) t h r o w s E x c e p t i o n { 43 J o b C o n f c o n f = new J o b C o n f ( WordCount .c l a s s) ; 44 c o n f . s e t J o b N a m e (" w o r d c o u n t ") ; 45 46 c o n f . s e t O u t p u t K e y C l a s s ( T e x t .c l a s s) ; 47 c o n f . s e t O u t p u t V a l u e C l a s s ( I n t W r i t a b l e .c l a s s) ; 48 49 c o n f . s e t M a p p e r C l a s s ( Map .c l a s s) ; 50 c o n f . s e t C o m b i n e r C l a s s ( Reduce .c l a s s) ; 51 c o n f . s e t R e d u c e r C l a s s ( Reduce .c l a s s) ; 52 53 c o n f . s e t I n p u t F o r m a t ( T e x t I n p u t F o r m a t .c l a s s) ; 54 c o n f . s e t O u t p u t F o r m a t ( T e x t O u t p u t F o r m a t .c l a s s) ; 55 56 F i l e I n p u t F o r m a t . s e t I n p u t P a t h s ( c o n f , new P a t h ( a r g s [ 0 ] ) ) ; 57 F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( c o n f , new P a t h ( a r g s [ 1 ] ) ) ; 58 59 J o b C l i e n t . r u n J o b ( c o n f ) ; 60 } 61 }
The first section of code (Figure 2.4) allows the program access to the Hadoop libraries needed to execute the code in the Hadoop environment. The first two packages are Java packages that are not specific to the Hadoop Libraries. The first imported package is the exception package. This allows the program to throw the exceptions it encounters and will provide a graceful shutdown and some help in debugging. The next import is the Java util-ities package that allows the program to have access to the iterator class and the string tok-enizer class. The next five packages are all Hadoop specific. The org.apache.hadoop.fs.Path package allows the program to access the Hadoop file system. This is needed both for the import of the files needed to run the program and the export path of the result. The next package, org.apache.hadoop.conf.*, is needed to set the job up. This is the part of the code that is executed in the main method. The package org.apache.hadoop.io.* is need to write to the read into the program and write out of the program and is additionally needed to write to logs,stdout, and stderr. The Hadoop utility package, org.apache.hadoop.util.*, is used to report progress on the program during execution. Finally, the org.apache.hadoop.mapred.* package is where the program is getting access to the majority of the classes need. This package includes the class definitions for the map and reduce methods. It will also give the program the class definitions for the input format, the input split, how the job is configured, the output collector, and the output format.
1 i m p o r t j a v a . i o . I O E x c e p t i o n ; 2 i m p o r t j a v a . u t i l . ∗ ; 3 4 i m p o r t o r g . a p a c h e . h a d o o p . f s . P a t h ; 5 i m p o r t o r g . a p a c h e . h a d o o p . c o n f . ∗ ; 6 i m p o r t o r g . a p a c h e . h a d o o p . i o . ∗ ; 7 i m p o r t o r g . a p a c h e . h a d o o p . mapred . ∗ ; 8 i m p o r t o r g . a p a c h e . h a d o o p . u t i l . ∗ ;
Figure 2.4: Hadoop WordCount Program: Libraries
The map class is going to be the next area of focus (Figure 2.5). The program first has to make the map deceleration.
1 p u b l i c s t a t i c c l a s s Map e x t e n d s MapReduceBase i m p l e m e n t s Mapper < L o n g W r i t a b l e , T e x t ,
In this case, Map is the program defined class name and it extends all of the properties of the MapReduceBase class. The MapReduceBase is what allows the Map class to be called in the job. Next the Map class must implement the Mapper as an interface. This is the class that actually maps the input to the intermediate output. The program must specify the input key/value data type and the output key/value pair data type that will become the intermediate data. Here the input key is the byte offset of the line from the input document and the value is the entire line of the input. The map class then must declare a map method.
1 p u b l i c v o i d map ( L o n g W r i t a b l e key , T e x t v a l u e , O u t p u t C o l l e c t o r < T e x t , I n t W r i t a b l e > o u t p u t , 2 R e p o r t e r r e p o r t e r )
The method will again specify the input key/value pairs. The method will declare the OutputCollector interface and set the key/value output type and give the OutputCollector a name. The Reporter type is what will report status back to the Hadoop system. This map method set the output to be Text and an IntWritable, both of which are Hadoop types. Finally the output statement collects writes the intermediate data out.
1 o u t p u t . c o l l e c t ( word , one ) ;
1 p u b l i c s t a t i c c l a s s Map e x t e n d s MapReduceBase i m p l e m e n t s Mapper < L o n g W r i t a b l e , T e x t ,
2 T e x t , I n t W r i t a b l e > { 3 p r i v a t e f i n a l s t a t i c I n t W r i t a b l e one = new I n t W r i t a b l e ( 1 ) ; 4 p r i v a t e T e x t word = new T e x t ( ) ; 5 6 p u b l i c v o i d map ( L o n g W r i t a b l e key , T e x t v a l u e , O u t p u t C o l l e c t o r < T e x t , I n t W r i t a b l e > o u t p u t , 7 R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n { 8 S t r i n g l i n e = v a l u e . t o S t r i n g ( ) ; 9 S t r i n g T o k e n i z e r t o k e n i z e r = new S t r i n g T o k e n i z e r ( l i n e ) ; 10 w h i l e ( t o k e n i z e r . h a s M o r e T o k e n s ( ) ) { 11 word . s e t ( t o k e n i z e r . n e x t T o k e n ( ) ) ; 12 o u t p u t . c o l l e c t ( word , one ) ; 13 } 14 } 15 }
Figure 2.5: Hadoop WordCount Program: Map Class
Next the reduce class will take over (Figure 2.6). The reduce class has to be specified in the program and must accept the input data types defined by the map data type output. The reduce class declaration is the first thing that must be considered.
1 p u b l i c s t a t i c c l a s s Reduce e x t e n d s MapReduceBase i m p l e m e n t s R e d u c e r < T e x t , 2 I n t W r i t a b l e , T e x t , I n t W r i t a b l e >
As with the map class, in order to run as a job the reduce class must also extend the MapRe-duceBase class. Then, the reduce class implements the Reducer interface and must declare the key/value input and output data types. This example shows the case where the reduce class key/value input and output are the same; this is not a constraint. The reduce class could output whatever the programmer chooses as long as it is a Hadoop data type. The reduce class must define a reduce method.
1 p u b l i c v o i d r e d u c e ( T e x t key , I t e r a t o r < I n t W r i t a b l e > v a l u e s ,
2 O u t p u t C o l l e c t o r < T e x t , I n t W r i t a b l e > o u t p u t , R e p o r t e r r e p o r t e r )
The reduce method declaration is similar to the map declaration. The OutputCollector and Reporter declarations are the same. The difference here is the declaration of the input key value pairs. The method must define the key type and then an iterator for the input value type. This is because before the intermediate data gets to the reduce class, it will be shuffled, and the reduce class will have all the the intermediate data that has the same key. So the reduce method will set the key and the iterate over all the values that exist in the intermediate data that have the same key that is set. This is where the output key/value pairs are reduced in the manner specified by the program. In this example, the final value for each key is just the sum of all of the values. Finally, the reduce class has to set the output. This is a similar to call the map output statement.
1 o u t p u t . c o l l e c t ( key , new I n t W r i t a b l e ( sum ) ) ;
1 p u b l i c s t a t i c c l a s s Reduce e x t e n d s MapReduceBase i m p l e m e n t s R e d u c e r < T e x t , I n t W r i t a b l e , 2 T e x t , I n t W r i t a b l e >{ 3 p u b l i c v o i d r e d u c e ( T e x t key , I t e r a t o r < I n t W r i t a b l e > v a l u e s , O u t p u t C o l l e c t o r < T e x t , 4 I n t W r i t a b l e > o u t p u t , R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n { 5 i n t sum = 0 ; 6 w h i l e ( v a l u e s . h a s N e x t ( ) ) { 7 sum += v a l u e s . n e x t ( ) . g e t ( ) ; 8 }
9 o u t p u t . c o l l e c t ( key , new I n t W r i t a b l e ( sum ) ) ;
10 }
11 }
The final piece the program must implement is a main method (Figure 2.7). The main method is responsible for the set up of the MapReduce job. This is done by declaring a JobConf object and telling it where the map and reduce methods are found. This example the classes are found in the WordCount class.
1 J o b C o n f c o n f = new J o b C o n f ( WordCount .c l a s s) ;
The JobConf typically sets the mapper, reducer, input format, output format [12], but it also allows the programmer to manipulate the job here. The programmer has the access here to set specific combiner classes, use distributed cache, use custom comparators, and even manipulate how the program executes by changing memory requirements for the map and reduce tasks. The JobConf also allows the setting of the input and output path. In this example it is set form the command line arguments.
1 J o b C o n f c o n f = new J o b C o n f ( WordCount .c l a s s) ; 2 c o n f . s e t J o b N a m e (" w o r d c o u n t ") ; 3 4 c o n f . s e t O u t p u t K e y C l a s s ( T e x t .c l a s s) ; 5 c o n f . s e t O u t p u t V a l u e C l a s s ( I n t W r i t a b l e .c l a s s) ; 6 7 c o n f . s e t M a p p e r C l a s s ( Map .c l a s s) ; 8 c o n f . s e t C o m b i n e r C l a s s ( Reduce .c l a s s) ; 9 c o n f . s e t R e d u c e r C l a s s ( Reduce .c l a s s) ; 10 11 c o n f . s e t I n p u t F o r m a t ( T e x t I n p u t F o r m a t .c l a s s) ; 12 c o n f . s e t O u t p u t F o r m a t ( T e x t O u t p u t F o r m a t .c l a s s) ; 13 14 F i l e I n p u t F o r m a t . s e t I n p u t P a t h s ( c o n f , new P a t h ( a r g s [ 0 ] ) ) ; 15 F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( c o n f , new P a t h ( a r g s [ 1 ] ) ) ; 16 17 J o b C l i e n t . r u n J o b ( c o n f ) ; 18 } 19 }
Figure 2.7: Hadoop WordCount Program: Main Method
The final piece to wrap up here is a walk through of the WordCount program execution. The idea is to use a small data set and walk through the program execution to follow the data flow and data transformation. This example will look at the nursery rhyme Jack and Jill (Figure 2.8).
1 J a c k and J i l l w e n t up t h e h i l l 2 To f e t c h a p a i l o f w a t e r .
3 J a c k f e l l down and b r o k e h i s crown , 4 And J i l l came t u m b l i n g a f t e r
Figure 2.8: Jack and Jill Nursery Rhyme
For the purpose of this walk through, we will only consider one map task and one reduce task. The first thing that will happen is the first line of the file will be read in by the Map class. The first line will come in with a byte offset of 0, therefore the key value be zero and the value of the line is "Jack and Jill went up the hill". Figure 2.9 shows the input key/values pairs for the whole text.
1 0 J a c k and J i l l w e n t up t h e h i l l 2 31 To f e t c h a p a i l o f w a t e r .
3 57 J a c k f e l l down and b r o k e h i s crown , 4 93 And J i l l came t u m b l i n g a f t e r
Figure 2.9: Byte Offset for Jack and Jill Nursery Rhyme
The line is then parsed using a StringTokenizer, which sets each word as a token. After the tokens are set, a while loop is set up on the condition that there are more tokens. This while loop will then set a Text variable to the String value of each token and then set the output to the word and one. This sends an intermediate data value of ("word", 1) for each word in the line. The intermediate value of the first line will be as follows:
1 J a c k 1 2 and 1 3 J i l l 1 4 w e n t 1 5 up 1 6 t h e 1 7 h i l l 1
This process is repeated for each line in the input file producing intermediate outputs of ("word", 1) for each word that occurs in the file. This produces the intermediate data that
will then be sorted and shuffled prior to going to the reduce method, Figure 2.10 illustrates the intermediate values for the entire file.
1 J a c k 1 2 and 1 3 J i l l 1 4 w e n t 1 5 up 1 6 t h e 1 7 h i l l 1 8 To 1 9 f e t c h 1 10 a 1 11 p a i l 1 12 o f 1 13 w a t e r 1 14 J a c k 1 15 f e l l 1 16 down 1 17 and 1 18 b r o k e 1 19 h i s 1 20 crown 1 21 And 1 22 J i l l 1 23 came 1 24 t u m b l i n g 1 25 a f t e r 1
Figure 2.10: Intermediate Key/Value Pairs from the Map Method
The sorting and shuffling process then takes over and sorts the data. The data is sorted into alphabetical order based on the key. The shuffling process would shuffle the data into logical units and then send data in logical sets to reduce tasks. Again, there is only one reduce task so it will get the entire data set. The input to the reduce method is illustrated in Figure 2.11.
1 And 1 2 J a c k 1 3 J a c k 1 4 J i l l 1 5 J i l l 1 6 To 1 7 a 1 8 a f t e r 1 9 and 1 10 and 1 11 b r o k e 1 12 came 1 13 crown 1 14 down 1 15 f e l l 1 16 f e t c h 1 17 h i l l 1 18 h i s 1 19 o f 1 20 p a i l 1 21 t h e 1 22 t u m b l i n g 1 23 up 1 24 w a t e r 1 25 w e n t 1
Figure 2.11: Shuffled Intermediate Key/Value Pairs from the Map Method
Notice this program looks at "and" and "And" as different words. This is how the map method is programmed, and if this result would be undesirable, then programmer could use string manipulation or Regex to get rid of this. The reduce method will then take over. Here, the reduce method will take each key in and iterate over all values for that key. In this reduce method, a int variable is set and used to sum the number of occurrences of each word. For instance, the word "Jack" is seen twice, the reducer will set the value of sum to one on the first occurrence of "Jack" by the call:
1 sum += v a l u e s . n e x t ( ) . g e t ( ) ;
At this point, the sum is set to one. On the next occurrence of "Jack", the same sum call is executed, and then sum is set to two. This process occurs until there are no more values for the key "Jack". In this case, there are no more values, so the reduce class then makes the
call to write the output of the key and the sum by:
1 o u t p u t . c o l l e c t ( key , new I n t W r i t a b l e ( sum ) ) ;
The reduce method will get called for each unique key in the intermediate data and write one output statement for each key until is has seen all of the keys in the intermediate data. The final result in this example is seen in Figure 2.12.
1 And 1 2 J a c k 2 3 J i l l 2 4 To 1 5 a 1 6 a f t e r 1 7 and 2 8 b r o k e 1 9 came 1 10 crown 1 11 down 1 12 f e l l 1 13 f e t c h 1 14 h i l l 1 15 h i s 1 16 o f 1 17 p a i l 1 18 t h e 1 19 t u m b l i n g 1 20 up 1 21 w a t e r 1 22 w e n t 1
Figure 2.12: Final Output of WordCount
2.5
Hadoop
The Hadoop Distributed File System is an open source project that was modeled after the Google Architecture outlined in Jeffery Dean and Sanjay Ghemawat’s MapReduce paper [6]. Hadoop started out as an open source web search engine called Nutch [7]. The Nutch project was having some trouble handling the distributed nature of computations that were needed for a web search engine. Then, from the Google papers on MapReduce and the Google File System (GFS), the project began to catch hold [7], [6], [9]. At that time,
Yahoo! became interested in the project, and Hadoop split from the Nutch project and became what it is today. HDFS is the distributed file system that supports MapReduce framework, and Figure 2.13 illustrates the HDFS architecture, from [11]. The distributed file system is set up to be fault tolerant and spread data over several nodes in a cluster. The data replication factor is nominally set to three. However, this can be changed and adapted to meet the specific needs of a given cluster. The most basic cluster, a single node, has the following services running NameNode, JobTracker, TaskTracker, Secondary NameNode, and DataNode.
Figure 2.13: HDFS Architecture
The NameNode is known as the master in the cluster. The NameNode is responsible for the file system namespace [13]. The NameNode is the node that manages the whole cluster. It maintains the file system tree and stores all the metadata for all of the directories and files within the distributed file system. The NameNode is the most important node in the cluster because it is needed to maintain all of the files in the distributed file system. If the NameNode were lost than all of the files in the distributed file system would be lost because the NameNode maintains the file structure to be able to put the blocks back together. The
NameNode stores the metadata and file tree in two files; the namespace image and an edit log [7]. Since, HDFS stores all of the files in blocks, the NameNode will keep a reference to every block that exists in the file system.
The Secondary NameNode is there to backup the NameNode. However, this is not a true backup in the sense that if the NameNode fails; the Seocndary NameNode would take over. The main purpose of the Secondary NameNode is to periodically merge the namespace image with edit log. This will prevent the edit log from becoming too large [7]. The Secondary NameNode keeps a copy of the merged edit log and file system image. It is important to note that this is not real time, that is to say that if the HDFS is restored from the Secondary NameNode image, there will be lost data from the time difference of when NameNode failure occurred and the last edit log merge. HDFS does provide a way for the NameNode to write the persistent state to several file systems to reduce the chance of all the file systems failing at once.
The JobTracker is responsible for accepting jobs and then dividing the jobs into tasks and assigning those tasks to DataNodes [14]. The JobTracker will try to do the best it can to maintain data locality, meaning that it will try to give the tasks to the DataNodes that physically have the blocks of data the task is to be executed on. The JobTracker will access the NameNode to be able to determine which nodes physically have the data. This is done to cut down on the amount of data that has to be transferred over the network. The JobTracker will then contact TaskTracker Nodes to determine whether the node is available to run a task. The TaskTracker is the service running on the DataNodes that will communicate with the JobTracker for processing tasks it has been assigned.
Figure 2.14: HDFS JobTracker Interaction
The DataNode is the true worker in HDFS. The DataNodes are what store and retrieve blocks of data. They inform, the NameNode of the data blocks they have and the JobTracker their status of currents jobs running and/or their availability status. The DataNodes are also the nodes responsible for actually running the MapReduce code on their blocks of data. The DataNodes will communicate with other DataNodes when they need to send or share data. The direct access reduces the amount of traffic needed to be sent if the nodes were required to go though the NameNode to communicate with another DataNode.
CHAPTER 3:
The Experiment Design
The DOD is continuing to create and store all types of data [15]. The technology for how to deal with data is continuously changing. Most current DOD solutions deal with storing data in a relational database and use SQL or Procedural Language/Structured Query Language (PL/SQL) to provide the analytics for this data. SQL and PL/SQL provide a large range of analytics and are very useful for many applications, but when the size of data to analyze becomes large, this approach hits its limitations. Adding a Big Data element to a relational database will provide additional means to store and analyze data.
3.1
Why Add a Big Data Element?
The world is producing petabytes of data daily and the amount of data being stored is increasing. A few examples of this are [13]:
• The New York Stock Exchange generates about one TB of new trade data per day. • Facebook hosts approximately 10 billion photos, taking up one PB of storage. • Ancestry.com, the genealogy site, stores around 2.5 PB of data.
• The Internet Archive stores around 2 PB of data, and is growing at a rate of 20 TB per month.
• The Large Hadron Collider near Geneva, Switzerland, will produce about 15 PB of data per year
The large scale of this data makes it difficult to store and process the data in a relational database. A big data element would allow some additional flexibility with storing and analyzing data.
Specifically, the addition of an HDFS cluster would allow quick processing of the entire data set. The true power of Hadoop is that it does process the entire data set [7]. This gives the ability to quickly analyze the entire data set. Hadoop can look at all of the data in the Database and return whatever analysis the programmer desires. Hadoop truly puts the power of all of the data in your corpus at your fingertips. There is little to no concern
that one must perform a large amount of table scans to return analytics; Hadoop by its very nature performs a table scan every time a program is run. That is its true power and it places no limitations on the possible analytics that can be run. Hadoop uses the MapReduce paradigm and forces the programmer to deal with key/value pairs, but these can be chained together to find analytics for all sorts of interesting problems.
3.2
Adding a Big Data Element to GCSS-MC
The current architecture of GCSS-MC does not have an architecture to support big data processing. The architecture that we are purposing in this thesis shows how it can be added as an additional element and integrate with the current architecture as long as it has Internet Protocol (IP) connectivity. The work we did in this thesis shows how a separate HDFS cluster can interact with separate database. We did not have access to the GCSS-MC Database so we used an Oracle Database to simulate the GCSS-MC Database. Figure 3.1 shows the setup the we used for our experimentation.
This experiment architecture does abstract away some of the difficulty that would be expe-rienced fully integrating a HDFS cluster into the GCSS-MC system. However, we believe that it does fully support the proof of concept that it can be done and done effectively without disrupting the current system. During the setup and testing of this thesis the only stipulation that we found for the cluster to work effectively was that it had to have IP con-nectivity to the Oracle Database.
The architecture uses is all open source technologies which are readily available to ev-eryone. We choose to use Ubuntu 12.04 LTS for the operating system to run the Hadoop cluster and Oracle 11G as the Relational Database to access and write to because that is what the GCSS-MC system is currently using for their Database. The choices of the op-erating system and type of database can be changed and adapted to many other situations with a few modifications. That is all of the software that was needed to run the experiment. There are several other Big Data abstractions and tools that can be used, but we felt that keeping the experiment to the core HDFS was important to show this proof of concept and limit the amount of software needed to make the experiment functional.
3.3
Building a Hadoop Cluster
We considered several possibilities discussions on the the best way forward to employ the Hadoop cluster. The initial reaction was to simply run several virtual machines on a large server to represent a cluster. But this thinking goes against the idea of using commodity machines linked together to gain additive power [6]. So we decided to run a simple exper-iment on the a single Hadoop machine versus a virtualized two node Hadoop cluster. The experiment ran several benchmarks on the two different setups and found that the speed up was hampered by the virtualized environment. Our conclusions came down to no matter how many virtual machines were running ultimately they all had to read and write data to the same hard disk, which subsequently causes a bottleneck. We found some other inter-esting factors that can be attributed to slower performance in a Hadoop virtual environment as well, the full paper can be found in the Appendix. There has been a lot of research in optimizing Hadoop in the virtual computing environment that has found tunable settings in Hadoop and the virtual hypervisor that can give the same performance as hardware [16]. Despite our findings we decided to run a ten node cluster on two servers. This was largely
decided due to the available recourses and our aim to show a proof of concept (and not focous performance). Once we decided on our way forward we began to build the cluster that would be used for the experiment. The cluster was built on top of two servers running Ubuntu 12.04 LTS. The virtual environment that we choose is Oracle’s VirtualBox. The Nodes were split up on the two servers with seven nodes on server one and three on server two. The split was decided based on the available random access memory (RAM) on each server.
Table 3.1: Experiment Server Specifications
Specifications Server #1 Server #2 OS Ubuntu 12.04 (64 Bit) Ubuntu 12.04 (64 Bit)
CPU(s) 4 4
CPU MHZ 1500 1600
HD TB 2 1
RAM GB 32 16
# of HDFS Nodes 7 3
Once the the severs and virtual environment was set up, we built the 10 virtual machine (VM)s on the the servers. The next step was to start and create the Hadoop nodes on the VMs. The Hadoop build was done using Tom White’s Hadoop: The Definitive Guide [7] and Michael Noll’s Hadoop Tutorials [17]. Each node was built and tested separately prior to adding them to the cluster. This process could have expedited up by cloning the machines, however, we wanted to ensure the integrity of each machine and the cluster.
Table 3.2: Experiment HDFS Node Specifications
Specifications HDFS Nodes OS Ubuntu 12.04 (64 Bit)
CPU(s) 1
HD GB 100
RAM GB 4
The process of installing Hadoop on a VM is not all that different from installing any software package. The Hadoop install is downloaded as a compressed file (.tar.gz). There are a few things that you must do as a prerequisite to the Hadoop install. First, you must ensure that you have an up-to-date version of Java (our testing has found Hadoop works
with Java 6 or Java 7). You must also configure SSH on the VM in order to allow Hadoop to communicate. Once this is done you have to edit several configuration files to set up the environment and then you are off and running. The configuration files are also used to control and optimize the cluster. See the appendix for full installation instructions.
CHAPTER 4:
The Experiment
This chapter will focus on two things: First how the data is taken from and loaded into the Hadoop ecosystem, then how the data is used in two separate analytic programs. The first program, is a home grown analytic tool that uses the GCSS-MC data to find the 100 most frequent NSNs in the whole data set and then build tables based on the top 100 NSNs found. The second program is a simulated SQL program that is taken from a similar project that is asking for SQL analytics on the same data set. The program runs on the data and produces the same results that SQL would produce. The second program is done to show that Hadoop can simulate anything done in an SQL environment. We believe that this makes a strong case that Hadoop can be used to provide analytics on both structured and unstructured data sets.
4.1
The GCSS-MC Data
The GCSS-MC data that we received is a sample of actual production data. This data set was obtained from the USMC and used to assist the thesis experiment. The data was first loaded into an Oracle 11g database. From the database we are able to see fourteen tables of GCSS-MC data. The first step in working with this structured data was to write a program to pull the data out of the relational database and move the data into the Hadoop ecosystem. There were several decisions made as to how to pull the data out and its format. The final choice was made to write a Java program to access the Oracle database using the Java database connect libraries and parse each table into separate files. The program was written to parse each table in the database into JavaScript Object Notation (JSON) format. A sample of the data in JSON format can be found in Figure 4.1. The NSNs are highlighted in the JSON formatted data.
JSON format was chosen because it lends itself nicely to formatting between different databases. Also, a table can quickly be built from the format. This is important because the goal of this project is to pull all of the data from a relational database and run big data analytics and ultimately return back to a simplified relational database table.
Figure 4.1: Sample GCSS-MC data in JSON Format
There are, however, some compromises that are made when using JSON format. When JSON, is used there is going to be some increase in the size it takes to store that data. In our case the Oracle database is approximately 1.25 gigabyte (GB) and when that data is taken out of the database and parsed into JSON it becomes 5.3 GB. That is a data increase factor of 4.24. This experiment can handle that level of storage increase, but that is not the case as the data becomes larger than 1 TB. We discussed this at length and chose to move forward with JSON with the understanding that this would have to change to increase the scale of the data. The main reason for us to maintain JSON is because keeping the meta-data in the data provides more flexibility in transforming and exporting the data dynamically.
Another option would have been to use a different format for the data that would reduce the size requirements. For example, the data could have been pulled from the database and parsed into Comma Separated Value (CSV) format. The CSV format would reduce the data blow up factor significantly. In the data set for this thesis, the increase factor for CSV would be 1.176, which is 4.5 times less than JSON format. Additionally, the data could be reduced when the database has a null value. For this dataset we did not reduce the null values out of the data. Again, the main reason to keep the null value was to provide the most flexibility to the analytics in the future.
There has to be a great deal of time spent on how to deal with the data when the possibility for the data to exceed TBs exists. The larger the data set, the more effort must be spent in
minimizing the size of the data. Especially when the Hadoop ecosystem will increase the storage requirement of any dataset by three. This is due to the data replication factor set for Hadoop. The data replication factor is tunable but if reduced below three, the cluster will be more susceptible to faults and may decrease speed of the programs due to the larger network overhead of having to send the data to a node to run the computation. The Hadoop NameNode will try to schedule the data computation on an idle node where the data resides. Therefore, if the replication factor is decreased then the chances of finding an idle node that has the data decreases. Thus the NameNode will have to schedule the computation to another node and send that node the data resulting in the overhead of additional scheduling message as well as the actual sending of the data on the network. Hadoop can handle large amounts of data and the main limit on the size of the data Hadoop can handle is the physical limitations of the machines on which it is deployed. As an illustration to the possibility of a Hadoop cluster; in 2010, Facebook had a Hadoop cluster that was 2,000 nodes and had a storage capacity of twenty-one PB [18].
Once all of the decisions were made on how to parse the GCSS-MC database, the next step was importing the data into the actual Hadoop ecosystem. We chose to simply use the Hadoop file manipulations commands to ingest the JSON feed into the the Hadoop ecosys-tem. The ingest is ultimately accomplished with a bash script executed on the NameNode. The bash script executes the Java program to pull the data from the Oracle database and then executes the Hadoop file system command to import the data. We experimented with Squoop to do this and were pleased with the results. Squoop is an Apache Hadoop tool that supports importing and exporting of database data into Hadoop. However, because Squoop did not support the JSON format we needed, we wrote a Java program that executes Java Database Connectivity (JDBC) calls to the database and then generates the JSON format. The program is designed to make the JDBC and call to the database and create the JSON format for each row on a separate line in a text file. Each row of data in the database rep-resenting one line in the text file is important because of how Hadoop reads the file in a MapReduce algorithm. For instance, if the program spread a row of data beyond one line in the value, it would be near impossible to write a MapReduce program to recreate the database row. This is due to Hadoop MapReduce handling each line of input data sepa-rately, which is extremely important to ensure the program can be split up and executed in parallel. Of course, we could write another program to handle that and recreate the row,
but that would incur a large overhead cost. Because we wrote the program that pulls that data, it gives us the power to control how the data is handled. We do not mean to say using additional tools like Squoop or another third party tool is a bad thing, just that in building our program we wanted to ensure we controlled the data at each stage and writing our own program to handle that was the best solution in our experiment. Additionally, we decided to try and keep the smallest footprint of software that was needed to run this experiment. We thought that if we can create all of the programs we needed, we would better understand the data flow and thus better understand how to use the tools that abstract the lower level coding.
4.2
Top-100-NSN Program
The first program we designed and developed is the NSN program. This program scans all 13 files pulled from the database, and finds the top 100 most frequently occurring NSNs. Then it pulls the data associated with those NSNs and creates a SQL statement to write the data back to the database. The idea of this program was to scan a large dataset and then create smaller tables that contain only specifically defined needed data. During the creation of this program we made the choices as to what data to return. Although, we made educated guesses on what a Supply Officer would deem important, that is not the true proof of importance of what we were able to show. The fact that the data is returned is what is important. The choice of what subset of data is returned does not matter; that can be changed in the program and then whatever data a customer wants can be delivered.
In order to get the desired result the program had to be written to execute several MapRe-duce jobs chained together. In this program we had to write five separate MapReMapRe-duce algorithms in order to achieve our desired result. The first MapReduce algorithm simply counts the occurrences of NSNs. The second algorithm sorts the NSNs in descending order of greatest frequency. The next algorithm finds all of the data associated with the NSNs. Then the final two algorithms create the JSON and the SQL statements and write the data to the Oracle database. Each one of the algorithms was individually created and tested. Once all of the algorithms were completed, we packaged them up into a single MapReduce job to run.
representation of the data flow through the whole program by illustrating the data at each algorithm. The inputs are on the left side of the algorithm boxes and the output at the right side. The inputs to the top of the box are used by the algorithm in the configure method of the map phase.
Figure 4.2: Data Flo w through the NSN Program
4.2.1
Top-100-NSN Program: First Algorithm
The first MapReduce algorithm in the NSN program finds and counts all of the NSNs in the entire corpus. This algorithm is similar to a word count algorithm.
Figure 4.3: NSN: First Algorithm
The major difference is the "word" that is counted is limited to an NSN. The map phase scans all thirteen tables and outputs only a valid NSN. The reduce phase will then sum up all of the occurrences of the NSNs and produce the output. This step is the first in the analysis process and is needed to find and output the most frequent NSNs in the data corpus. The data input to the algorithm is similar to the data in Figure 4.1 and the highlighted values are the NSNs we are searching for in this algorithm. The output sample is shown in Figure
4.4.
Figure 4.4: NSN Output from the First Algorithm
4.2.2
Top-100-NSN Program: Second Algorithm
The next step is to sort the NSNs by frequency. Hadoop will always provide some order on the output from the reduce phase. The order is given based on the value of the key of the key/value pair. Hadoop, by default, will order the output of the reduce phase in a alphabetic and numeric ascending order.
In this algorithm we want to have the reduce phase produce an output that will be ordered by the frequency of NSN occurrences in descending order. This has to be done in a separate algorithm than the first. If we attempt to order in the first algorithm before it completes, then the sum of the NSN occurrences will not be accurately produced. Now we have the output of the first algorithm as the input into the second algorithm. There are two major obstacles to overcome to produce the desired output. First we have to override the output key comparator class to produce a key output in descending order. This process occurs in the combining and shuffling steps of Hadoop. The code created to perform this descending order sort is shown in Figure 4.6. The method overrides the method Hadoop calls to compare. This method is a recursive function that transposes the order by multiplying everything by a negative one.
The second obstacle is that we need to transpose the key/value pairs such that the frequency value of NSNs is now the key and the value becomes the NSN. This is done to produce an
Figure 4.5: NSN: Second Algorithm
output that will be ordered in descending order based on the frequency of occurrence of a particular NSN. The transposing of the key/value pair is accomplished in the map phase. Then the shuffling and combing phase will produce the input to the reduce phase with the NSN frequency as the key and the NSN itself the value. A sample of the output of the algorithm can be seen in Figure 4.7.
1 s t a t i c c l a s s R e v e r s e C o m p a r a t o r e x t e n d s W r i t a b l e C o m p a r a t o r { 2 p r i v a t e s t a t i c f i n a l T e x t . C o m p a r a t o r TEXT_COMPARATOR = new T e x t . C o m p a r a t o r ( ) ; 3 4 p u b l i c R e v e r s e C o m p a r a t o r ( ) { 5 s u p e r( T e x t .c l a s s) ; 6 } 7 8 @ O v e r r i d e 9 p u b l i c i n t c o m p a r e (b y t e[ ] b1 , i n t s1 , i n t l 1 , b y t e[ ] b2 , i n t s2 , i n t l 2 ) { 10 r e t u r n ( −1)∗ TEXT_COMPARATOR 11 . c o m p a r e ( b1 , s1 , l 1 , b2 , s2 , l 2 ) ; 12 } 13 14 }
Figure 4.6: Descending Order Sort Class
Figure 4.7: NSN Output from the Second Algorithm
4.2.3
Top-100-NSN Program: Third Algorithm
The output from the second algorithm enables us to search and find all of the data associated with the 100 most frequently occurring NSNs. In order to facilitate using the output from the second algorithm we used the distributed cache functionality that is part of Hadoop. The distributed cache allows the programmer to access other files on the HDFS. Then we had to decide what type of data structure to use to build with the output from algorithm two. After some initial testing we found that using a hash map provided the faster comparisons than a map or a pattern. Hadoop also provides the configure method to use as means to set up data structures that will be needed in the map phase. The configure method allow the
programmer to build data structures that can be used in the map phase of the algorithm. This is critical because each map instance will have to build the data structure exactly the same in order to achieve predictable,repeatable results. In this particular configure method we create a hash map with the first 100 NSNs from the output of algorithm two.
Figure 4.8: NSN: Third Algorithm
The map phase will then scan all of the data in the corpus and produce an output from the map phase if and only if the NSN is found in the line of data. The line is output in JSON format with the NSN and the source file appended to the front of the JSON. The
reduce phase of this algorithm is the identity reduce. It simply outputs the key/value pairs it receives from the map phase. A sample of the output can be found in Figure 4.9.
Figure 4.9: NSN Output from the Third Algorithm
4.2.4
Top-100-NSN Program: Fourth Algorithm
The next step is to use the output from the third algorithm and parse the format down to the JSON that we wish to write back to the database.
This algorithm and the next one could be reduced to one that generates the JSON and cre-ates the SQL statement to write to the database using a JDBC call. Ultimately, we decided to keep them separate so that we could use the JSON format in the future to regenerate the tables, if necessary. Furthermore, to maintain the flexibility to adapt to other tools, like a database Hadoop connector, we felt it prudent to keep the JSON formating code. This algorithm takes the output from the third algorithm and parses it down into a JSON format to write back to the database. A sample of the output of the algorithm can be seen in Figure 4.11.
At this point we made some decisions about what data to write back. Table 4.1 illustrates the data that we write back to the database. At this point the data choice can be modified and tailored to exactly what a customer/stakeholder would desire. The choices were made just show a proof of concept that the data can be found and written back to a database to show a smaller amount of data that is desired. However, any of the data that exists in the original database can be retrieved with only minor modifications to the code base. The benefit of this type of analytic is truly realized as the data size increases, the customer can keep all of their data and produce tables of just the pertinent data required for a particular need.
Figure 4.10: NSN: Fourth Algorithm