Implementation and Analysis of Join
Algorithms to handle skew for the
Hadoop Map/Reduce Framework
Fariha Atta
MSc Informatics
School of Informatics
University of Edinburgh
I
Abstract
he Map/Reduce framework -- a parallel processing paradigm -- is widely being used for large scale distributed data processing. Map/Reduce can perform typical relational database operations like selection, aggregation, and projection etc. However, binary relational operators like join, cartesian product, and set operations are difficult to implement with Map/Reduce. Map/Reduce can process homogeneous data streams easily but does not provide direct support for handling multiple heterogeneous input data streams. Thus the binary relational join operator does not have efficient implementation in the Map/Reduce framework. Some implementations of the join operator exist for the Hadoop distribution of the Map/Reduce framework. However, these implementations do not perform well in case of heavily skewed data. Skew in the input data affects the performance of the join operator in parallel environment where data is distributed among parallel sites for independent joins. Data skew can severely limit the effectiveness of parallel architectures when some processing units (PUs) are overloaded during data distribution and hence take a greater time for completion as compared to other PUs. This also results in wastage of resources of the idle PUs. As data skew naturally occurs in many applications, handling it is an important issue for improving the performance of the join operation. We implement a hash join algorithm which is a hybrid of the map-side and the reduce-side joins of Hadoop with the ability to handle skew and we compare its performance to the other join algorithms of Hadoop.
II
Acknowledgements
My heartfelt gratitude goes to my supervisor Stratis Viglas who provided me guidance and support throughout the project. I specially appreciate his willingness to help anytime.
I would like to acknowledge Chris Cooke for administering the Hadoop cluster and answering my queries regarding the cluster.
Finally, I would thank my family and friends for their continuous moral support and encouragement.
III
Declaration
I declare that this thesis was composed by myself, that the work contained herein is my own except where explicitly stated otherwise in the text, and that this work has not been submitted for any other degree or professional qualification except as specified.
IV
Table of Contents
Chapter 1. Introduction ... 1
1.1. Motivation ... 1
1.2. Related Work ... 4
1.3. Problem Statement and Aims ... 6
1.4. Thesis Outline ... 8
Chapter 2. Background ... 9
2.1. The Map/Reduce Framework ... 9
2.2. The Hadoop Distribution ... 10
2.3. Impact of parallelization on the join operation ... 13
2.4. Hash Join algorithm ... 15
2.4.1. Join algorithms on a single processor machine ... 15
2.4.2. Hash join algorithm for parallel implementation ... 17
2.5. Skew and its impact on join operation ... 18
2.5.1. Hash Partitioning and its skew sensitivity ... 20
2.5.2. Range Partitioning and its skew sensitivity ... 22
2.6. Pre-processing for joining ... 25
2.6.1. Semi-join using selection ... 27
2.6.2. Semi Join using Hash Operator ... 28
Chapter 3. Join Algorithms on Hadoop ... 32
3.1. Reduce-side joins ... 32
3.2. Map-side partitioned joins ... 37
3.3. Memory-backed joins ... 39
3.4. The hybrid Hadoop join ... 41
Chapter 4. Evaluation ... 46
V
4.2. Data Sets ... 46
4.3. Tests ... 48
4.3.1. Test 1: To determine time for sampling and finding the splitters ... 48
4.3.2. Test 2: To determine the size of the bloom filter for pre-processing 49 4.3.3. Test 3: To determine the effectiveness of pre-processing ... 50
4.3.4. Test 4: To determine the effect of increasing partitions on the join of skewed relations ... 52
4.3.5. Test 5: To determine the number of partitions for the virtual processor range partitioning ... 56
4.3.6. Test 6: Comparison of algorithms in the case of skewed and non-skewed joins ... 58
4.4. Discussion on the results of the comparison tests ... 66
Chapter 5. Conclusion and Future Works ... 69
VI
List of Figures
Figure 1: The Map/Reduce dataflow ... 10
Figure 2: The HDFS Architecture... 11
Figure 3: Hadoop Map/Reduce dataflow (source [43]) ... 13
Figure 4: Partitioning R and S on join column R.r=S.s in three partitions using hash(k)=k%3 ... 14
Figure 5: Grace Join Algorithm (source [44]) ... 16
Figure 6: Joining on parallel machines ... 18
Figure 7: Example of data skew -- Patent and Cite Tables ... 21
Figure 8: A bloom filter with m=15, k=3 for a set {A, B, C} ... 29
Figure 9: Bit setting in bloom filter in case of collision ... 29
Figure 10: Construction of a Bloom Filter ... 31
Figure 11: Data flow of the reduce-side join ... 33
Figure 12: Key-Value pairs before and after group partitioning ... 35
Figure 13: Data flow of the map-side partitioned join ... 39
Figure 14: Data flow of the memory-backed join ... 40
Figure 15: Data flow of the hybrid Hadoop join ... 42
Figure 16: Custom range partitioning for Hadoop ... 44
Figure 17: Time trend for taking samples and selecting splitters ... 48
Figure 18: Execution time of join algorithms with and without pre-processing in low selectivity situations ... 51
Figure 19: Effect of pre-processing in high selectivity situations ... 52
Figure 20: Execution time of the HHH algorithm for different numbers of partitions (i.e. the reduce tasks) ... 55
Figure 21: The execution times of the reducers handling the skewed keys for different number of partitions ... 58
VII
Figure 22: Stages of Execution for Join Algorithms ... 59
Figure 23: Join Results for Input1xInput1, output records produced: 4,001,864 ... 60
Figure 24: Join Results for Input10KxInput1, output records produced: 2,000,368 .. 61
Figure 25: Join Results for Input100KxInput1, output records produced: 2,002,328 61
Figure 26: Join Results for Input300KxInput1, output records produced: 1,799,705 62
Figure 27: Join Results for Input400KxInput1, output records produced: 2,000,015 62
Figure 28: Join Results for Input500KxInput1, output records produced: 1,999,457 63
Figure 29: Join Results for Input600KxInput1, output records produced: 1,798,816 63
Figure 30: Join Results for Input300KxInput10, output records produced: 4,503,238 ... 64
Figure 31: Join Results for Input500KxInput10, output records produced: 6,501,763 ... 64
Figure 32: Join Results for Input600KxInput10, output records produced: 7,199,115 ... 65
Figure 33: Join Results for Input100KxInput100, output records produced: 11,899,500 ... 65
Figure 34: Join Results for Input300KxInput100, output records produced: 31,499,691 ... 66
Figure 35: Time taken by the memory-backed join for different number of tuples in the build relation (*DNF=Does Not Finish) ... 67
VIII
List of Tables
Table 1: Books Relation ... 26
Table 2: Authors Relation ... 26
Table 3: Filtered Book Relation ... 26
Table 4: Conventions for representing the algorithms ... 47
Table 5: Effects of increasing size of the bloom filter ... 50
Table 6: Time taken by reducers in case of skewed relation (6 partitions) ... 53
Table 7: Time taken by reducers in case of skewed relation (12 partitions) ... 54
Table 8: Time taken by reducers in case of skewed relation (18 partitions) ... 54
Table 9: Time taken by reducers in case of skewed relation (24 partitions) ... 54
Table 10: Time taken by reducers in case of skewed relation (30 partitions) ... 55
Table 11: Effect of the number of partitions on the execution time of skewed reducers for the virtual processor range partitioning ... 57
1
Chapter 1
Introduction
1.1
Motivation
Past are the times when applications used to process kilobytes or megabytes of data. Now is the age of processing gigabyte, terabyte, or even petabyte scale data. Applications these days deal with gigantic amounts of data that do not fit in main memory of one single machine and are also beyond the processing power of one machine. For example, the data volume of the National Climatic Data Centre (NCDC) [34] is 350 gigabytes; E-bay maintains 17 trillion records and has a total size of 6.5 terabytes [36]; Facebook manages more than 25 terabytes of data per day for logging [35]; The Sloan Digital Sky Survey (SDSS) [33] maintains about 42 terabytes of image data and 18 terabytes of catalogue data. Processing this massive amount of data is not an easy feat. Data-intensive applications use a distributed infrastructure containing clusters of computers and employ distributed parallel algorithms to efficiently process huge volumes of data. One of the fairly recent advancements in distributed processing is the development of the Map/Reduce paradigm [1]. Map/Reduce – a programming framework developed at Google – provides a cost-effective, scalable, flexible, and fault-tolerant distributed platform for large scale distributed data processing across a cluster of hundreds or even thousands of nodes. It allows for processing huge volumes of data in parallel [1, 2], upto multiples of petabytes. Computations are moved to machines having appropriate data on which processing has to be carried out, rather than moving data to machines that can perform computation on it (the traditional parallel processing approach). Map/Reduce makes use of a large number of shared-nothing cheap commodity machines. The use of commodity hardware results in a cheap solution to large data processing problems. Replication of data on multiple nodes ensures availability and reliability on the unreliable underlying hardware. Map/Reduce takes care of data movement, load balancing, fault-tolerance, job scheduling, and other nitty-gritty details of parallel processing. Users of the Map/Reduce framework just have to concentrate on data processing algorithms which have to be
2
implemented in the “map” and “reduce” functions of the framework. “Map” and “Reduce” are the two primitives provided by the framework for distributed data processing. The signatures of these primitives are:
Map: (k1, v1) -> [(k2, v2)] Reduce: (k2, [v2]) -> [v3]
The map function converts input key-value pairs of data into intermediate key-value pairs which are distributed among reduce functions for further aggregation. In simple terms, data is distributed among nodes for processing during the map phase and the result is aggregated in the reduce phase. Algorithms for processing distributed data have to be supplied in these primitives of the framework. This simplifies the development of programs for parallel settings.
When it comes to the challenge of processing vast amounts of data, the distinction of structured and unstructured data does not matter much for Map/Reduce. The perceived rival of Map/Reduce, the parallel DBMS, is suitable for efficiently processing large volumes of structured data because of its ability to distribute the relational tables among processing nodes, compression, indexing, query optimization, result caching etc. However, the parallel DBMS has some inherent limitations. Once it is deployed and the data is distributed among nodes, adding more nodes to scale the parallel DBMS becomes very difficult. Moreover, the parallel DBMS is not fault-tolerant [5]. In case of a fault at one node during the processing, the whole processing sequence has to be restarted at each node. Because of these reasons, the parallel DBMS cannot cope well with the demand of processing tremendously large amounts of data. In addition, processing unstructured data is out of the jurisdiction of the parallel DBMS. For such a situation, Map/Reduce comes to the rescue. Map/Reduce can process bulks of unstructured data and can also handle structured relations of conventional databases. All a user of the Map/Reduce framework needs is to implement the processing logic in the “map” and “reduce” primitives of the framework. Users can specify algorithms for selection, projection, aggregation, grouping, joining, or other similar processing tasks of the relational databases in these primitives.
Of all the different available implementations of Map/Reduce e.g. GreenPlum [37], Aster Data [38], Qizmt [39], Disco [40], Skynet [41], Apache’s implementation of Map/Reduce (called Hadoop [31]) is the most commonly and widely used implementation for educational and research purposes because of its
3
open-source and platform-independent nature. Hadoop allows for easily developing robust, scalable, and efficient programs for distributed data processing. Hadoop is based on client-server architecture. Centralized management by the master of the Hadoop cluster makes tasks far more simple and organized. The master distributes the workload among worker/slave nodes, which on completion of the requested task report to the master. The master decides the next course of actions to be taken. The storage of vast amount of the distributed data and its dissemination to worker nodes in the cluster is managed by the Hadoop Distributed File System (HDFS) [32].
As mentioned earlier, different sorts of operations can conveniently be performed on structured as well as unstructured data using Map/Reduce. Among these operations, joining two heterogeneous datasets is the most important and challenging operation. Many Map/Reduce applications require joining data from multiple sources. For example, a search engine maintains many databases such as crawler, log, and webgraph databases. It constructs the index database using both the crawler and webgraph databases so it requires joining these two datasets. However, joining large heterogeneous datasets is quite challenging. Firstly, processing two massive datasets in parallel for finding a match based on some attribute is intimidating even if a large computational cluster is available. Secondly, in a distributed setting, the datasets involved in joining are stored at distributed sites. Thirdly, although the database world is full of different techniques for the join operation, Map/Reduce itself is not built for processing multiple data streams in parallel. By its very nature, Map/Reduce processes a single stream of data at a time and hence does not have an efficient description for joining two parallel streams. In the Hadoop implementation of the framework, some strategies have been documented for the join operation. These are the Map-side, Reduce-side, and Memory backed join techniques but have their own inherent limitations. When it comes to joining skewed datasets, the performance of these join techniques is degraded. Skew in the distribution of join attribute’s value can limit the effectiveness of parallel execution. The variation in the processing time of parallel join tasks affects the maximum speedup that can be achieved by virtue of parallel execution. In their popular paper “Map-Reduce: A major step backwards”, DeWitt and Stonebraker criticize Map/Reduce on various aspects, one of which is its inability to handle skew. They state:
“One factor that Map/Reduce advocates seem to have overlooked is the issue of skew… The problem occurs in the
4
map phase when there is wide variance in the distribution of records with the same key. This variance, in turn, causes some reduce instances to take much longer to run than others, resulting in the execution time for the computation being the running time of the slowest reduce instance. The parallel database community has studied this problem extensively and has developed solutions that the Map/Reduce community might want to adopt.”
Therefore, this project aims at developing a hash join algorithm for the Map/Reduce framework with the specialty to handle large skew in data and analyzing its performance with respect to the implementations provided by Hadoop.
1.2
Related Work
The join operation is one of the fundamental, most difficult, and hence the most researched query operation. It is an important operation that facilitates combining data from two sources on the basis of some common key. The database literature is full of discussions on techniques, performance, and optimization of this operation. Nested loops, sort/merge, and hash join are the most commonly used join techniques. [3], [20] provide a general discussion on join processing in relational databases for single processor systems. They determine that the nested loops algorithm is useful only when the datasets to be joined are relatively small. When the datasets are large, hash-based algorithms are superior to sort-merge joins provided that the final result needs not be sorted before presenting it to user.
The optimization of join techniques for multiprocessor environment has also been widely researched. [10], [12], [15] discuss join algorithm for multiprocessor databases. The performance of the sort-merge, grace, simple, and hybrid join techniques on a shared-nothing multiprocessor setup, GAMMA, is presented in [4]. They prove that the hybrid hash join algorithm dominates other algorithms for all degrees of memory availability.
Different join techniques for Map/Reduce have also been researched. Hadoop implements map-side and reduce-side joins [26], [27] in which the join operation is carried out in mappers and reducers respectively. Set-similarity joins using Map/Reduce are discussed in [9]. They use the reduce-side join with custom partitioning to group together most similar tuples. [11] discusses the optimization
5
of the join operator for multi-way and star joins using the Map/Reduce framework. For a 3-way join among R x S x T, the tuples of R and T are replicated across a number of reducer nodes to avoid communicating the result of the first join. The join is performed on the reduce side. They prove that a multi-way join using a single Map/Reduce job is more efficient than cascading a number of Map/Reduce jobs, each performing a 2-way join. A modification to the Map/Reduce framework for the join operation is presented in [9], called Map-Reduce-Merge. It introduces a new stage called “merge” where matching tuples from multiple sources are joined. The modified primitives are:
Map: (k1, v1)α -> [(k2, v2)]α
Reduce: (k2, [v2])α -> (k2, [v3])α
Merge: ((k2, [v3])α, (k3, [v4])β ) -> [(k4, v5)]ϒ
Where k is the key, v is the value, and α, β, ϒ are the lineages. A map function converts the key-value pair (k1, v1) from lineage α to an intermediate key-value pair. The reduction operation puts all intermediate values related to k2 in the list [v3]. Another map-reduce operation does the same with a key-value pair from lineage β and the subsequent reduce produces a key-value pair (k3, [v4]). Depending on the values of keys k2 and k3, the merge operator performs the join and combines the two reduced outputs in another lineage ϒ. A user-defined module “partition selector” determines which reducers the merge operator gets data from for joining. Thus joining is carried out after the reduce stage in an additional merge phase where each merger receives corresponding data of multiple datasets from the reduce phase. Sort-merge, block-nested, or hash joins can be performed in the merge phase. Hence, all the processing is complete in one Map/Reduce job. However, the Map-Reduce-Merge implementation requires changes in the basic Map/Reduce framework.
Many techniques have been implemented for handling skew in parallel joins operation. [18] presents a partitioning strategy for skewed data which assumes that the skewed keys are known in advance, which is not the case in practical situations. The range partitioning has long been studied for assigning tuples to partitions on the basis of ranges rather than hash values of a join key. [23] sorts input datasets according to join keys. Depending on the processing capability of the system, the number of tuples T to be allocated to each partition is determined. The sorted datasets are then divided into n partitions, each containing T tuples, and partitions
6
are assigned to processing units (PUs) in a round robin fashion. The tuples of the second dataset are assigned to PUs on the basis of partition ranges of the first dataset. This approach sorts the input dataset for determining appropriate ranges for partitioning, which is quite costly if the datasets are very large. [24], [28, [29] determine the ranges for partitioning after the full scan of the input data to determine the skewed keys. The scanning cost may overshadow the benefits obtained from the range partitioning. [25] determines the ranges for partitioning by randomly sampling the input datasets. It also presents the virtual processor partitioning approach which divides an input dataset into a greater number of partitions than PUs to scatter the skewed keys.
To our knowledge, no work has yet been done for handling skew in the join operation for Hadoop. We attempt to add skew handling capability for Hadoop using the range partitioning.
1.3
Problem Statement and Aims
Although several join techniques are available in the database literature, implementing them for Map/Reduce framework is, if not impossible, not so easy because of the very nature of the Map/Reduce framework. All the join techniques implemented for the Map/Reduce framework discussed in the Related Works section have their limitations. Map-Reduce-Merge, for example, introduces a new stage “merge” that can be an overhead for implementation and hence is not an efficient solution. The reduce-side join implemented by Hadoop incurs the space and time overhead as a result of tagging each tuple with source information. Although the performance of the map-side join is better than the reduce-side join, the prerequisite for the map-side join is that both input datasets must be pre-partitioned and properly structured before being input to the mappers. Moreover, these algorithms use the hash partitioning for distributing the two datasets among the worker nodes. The hash partitioning is sensitive to skew in the input data when some values appear multiple times. The similar repeated values are routed to one single processing node using the hash partitioning scheme. As a result, the worker nodes handling the repeated values are overloaded with too many records to be joined. Hence, the algorithms using hash partitioning for data distribution are prone to a degraded performance when the input datasets to be joined contain skewed keys. Since all the Hadoop join algorithms use the
7
hash-partitioning for data distribution, they suffer from a performance hit in the case of skewed data.
Problem Statement: We consider the equi-join on two data sources R and S, with cardinalities |R| and |S|, on the basis of a single join column such that R.r=S.s. For simplifying the situation, we do not perform any selection and projection on the data sources, although it can easily be incorporated during either the map or the reduce phase while emitting key-value pairs at nodes. To keep things simple, we perform an inner join on the datasets for evaluation. We assume that both dataset are stored in HDFS. Our major tasks in this project are:
1. We provide a detailed discussion of various join algorithms supplied by the Hadoop implementation. We analyze the pros and cons of each of these algorithms i.e. the map-side, reduce-side, and memory-backed joins. 2. We discuss our hybrid algorithm that is a combination of the map-side and
the reduce-side join.
3. We discuss some pre-processing techniques such as the semi-join through bit-filtering and semi-join through selection which can reduce the sizes of datasets to be joined by removing those keys that do not take part in the join operation. We experimentally determine the performance of both techniques for filtering the input datasets.
4. We present different partitioning strategies for workload distribution among the nodes performing the join operation. The default hash-based partitioning of Hadoop overloads some nodes in the case of skewed keys. We discuss how to avoid this by using the range partitioning approach. We incorporate the range partitioning in our algorithm for handling skew. 5. We conduct experiments to compare the performance of the map-side,
reduce-side, and memory-backed join algorithms with our hybrid algorithm (both versions: hash partitioning and range partitioning) for handling skewed data.
8
1.4 Thesis Outline
Chapter 2 equips the reader with some background information to better understand the problem statement. Overview of Map/Reduce, its Hadoop implementation, hash join techniques, partitioning strategies, and filtering techniques are presented in this chapter.
Chapter 3 discusses the Hadoop implementation of the map-side, reduce-side, and memory-backed joins and presents our hybrid algorithm with the range and hash partitionings.
Chapter 4 presents the experiments conducted to compare the performance of these algorithms in case of skewed and non-skewed input datasets and discusses the results.
Chapter 5 concludes all the findings and suggests some future extensions to the project.
9
Chapter 2
Background
2.1 The Map/Reduce Framework
The Map/Reduce framework consists of two operations, “map” and “reduce”, which are executed on a cluster of shared-nothing commodity nodes. In a map operation, the input data available through a distributed file system (e.g. GFS [6] or HDFS[32]), is distributed among a number of nodes in the cluster in the form of key-value pairs. Each of these mapper nodes transforms a key-value pair into a list of intermediate key-value pairs (the output keys may not be the same as the input keys). Depending on the map operation, the list may contain 0, 1, or many key-value pairs (shown in Figure 1). The signature of this map operation is:
Map(k1, v1) -> list(k2, v2)
The intermediate key-value pairs are propagated to the reducer nodes such that each reduce process receives values related to one key. The values are processed and the result is written to the file system. Depending on the reduce operation, the result may contain 0, 1, or many values (list of values). The signature of the reduce operation is:
Reduce(k2, list(v2)) -> list(v3)
An example of a map-reduce job is to count the number of times a particular word appears in some input data [1]. Input to this problem is a data file, the contents of which are distributed among the nodes of a cluster in the form of splits. Each node receives lines one-by-one from its input split in the form of key-value pairs; key in this case is the byte offset of the line and the value is a line in the file. A map operation is performed on each key-value pair which produces a list of intermediate key-value pairs, in this case a word and its count e.g., (“Hadoop”, “1”), (“join”, “1”) etc. The value “1” indicates that the word appears once in a particular line. The key-value pairs from all the mappers are stored in buckets such that values related
10
to one key (word) are gathered in one bucket. A set of such buckets, called partition, is then provided to each reducer. A reducer calls a reduce operation for every bucket that performs an aggregate operation on the values to determine the count of the total number of times a word appears in the input file. Thus each call to a reduce operation produces one output for the key. An optional “combine” phase can be employed by each mapper to minimize the traffic on the network. In this phase, each mapper locally aggregates the output for each key. Hence, for example instead of transferring a key-value pair (“Hadoop”, “1”) thirty times, a mapper sends (“Hadoop”, “30”) only once to the file-system. This reduces the traffic when a reducer picks the value during the shuffling phase.
2.2 The Hadoop Distribution
The Hadoop Distribution is based on HDFS – a file-system with master-slave architecture. In a Hadoop cluster of n nodes, one node is the master node called NameNode (NN). The other nodes are worker nodes, called DataNodes (DN). The NN maintains metadata about the FileSystem. Files are broken down into splits of 64MB and distributed among the DNs. The larger size of splits as compared to the split size of conventional file systems reduces the amount of metadata to be maintained for each file. Each split is replicated on three DNs to ensure fault-tolerance. Hadoop also ensures “locality of data” i.e. processes are scheduled
11
on the nodes which possess the data on which processing has to be performed, that is, computation is moved to the nodes containing data rather than data being moved to the nodes capable of doing computation. This reduces the amount of the data transferred among nodes and hence improves the performance. The DNs constantly send a “heartbeat” message to the NN along with the status of the task they have been assigned. If the NN doesn’t get any information from a node for a threshold time, it re-schedules the task on another node which contains the replica of the data. Similarly, if tasks are distributed among data-nodes and all of the nodes have finished the processing but a straggler node still is performing the processing, the NN re-schedules the same task on an idle node. Whichever node returns the result earlier, its output is used by the NN and similar processes on the other nodes are killed. The general architecture of HDFS is shown in Figure 2.
12
Jobtrackers and tasktrackers work on master and worker nodes respectively to handle jobs and tasks. When a Map/Reduce job is submitted to the master, a jobtracker divides it in “m” tasks and assigns a task to each mapper. Following is the sequence of steps for conversion of input to output on Hadoop:
1. Mapping Phase: Each mapper works on non-overlapping input splits assigned to it by the NN. An input split consists of a number of records. The records can be in different formats depending on the InputFormat of the input file. A RecordReader for that particular InputFormat reads each and every record, determines the key and value for the records, and supplies the key-value pairs to map functions where the actual processing takes place (Figure 3). Each mapper applies a user defined function on the key-value pairs and converts them to intermediate key-value pairs. The intermediate results of mappers are written to the local file-system in a sorted order. 2. Partitioning Phase: A “partitioner” determines which reducer an
intermediate key-value pair should be directed to. The default partitioner provided by Hadoop computes a hash value for the key and assigns the partition on the basis of the function: (hash_value_of_key) mod (total_number_of_partitions).
3. Shuffling Phase: Each map process, in its heartbeat message, sends information to the master about the location of the partitioned data. The master informs each reducer about the location of the mapper from which it has to pick its partition. This process of moving data to appropriate reducer-nodes is called “shuffling”.
4. Sorting Phase: Each reducer, on receiving its partitions from all mappers, performs the sort-merge join to sort the tuples on the basis of the keys. Since keys within partitions were already sorted in each mapper, the partitions have to be merged only such that the similar keys are grouped together. 5. Reduce Phase: A user-defined reduce operation is applied on each group of
13
Figure 3: Hadoop Map/Reduce dataflow (source [43])
2.3 Impact of parallelization on the join operation
The massive growth in the input data to be processed hampers the performance of the applications executing on uni-processor machines. If it curtails the performance of single stream operators (selection, projection, aggregation etc.), it doubles the trouble for the join operator which handles two data streams in parallel. Matching the records from gigantic data streams is clearly overwhelming. In large data warehousing applications, this may mean joining of trillions of records. Multi-processor or distributed processing is the solution to this problem and significantly improves the response time.
In a multi-processor or distributed setting, the performance of the join operation can be improved by using partition-wise joins [14]. The input data is partitioned among a number of machines such that processing at parallel machines
14
can be carried out independently. For parallel evaluation of the join operator, the two datasets are partitioned by applying a hash function in the same way such that each machine handles a subset of keys which can be joined independently. The partitioning of the datasets is performed on the basis of the join key so that a machine gets all tuples with same join key from both datasets. Thus partitioning the input datasets scatter them across a number of machines where a partition-wise join is carried out (Figure 4). This partition-wise join is a key to achieving scalability for massive join operations as it reduces the response time. The amount/degree of parallelism of the partition-wise join is limited by the number of partition-wise joins that can be executed concurrently. The greater the number of such concurrent partition-wise joins, the greater is the degree of parallelism. If the number of parallel partition-wise joins is 8, the degree of parallelism is 8.
Figure 4: Partitioning R and S on join column R.r=S.s in three partitions using
hash(k)=k%3
Sort-merge and hash joins are the natural choices for the join operation in distributed environments since both of these join techniques can operate independently on subsets of join keys. As each partition contains the same join keys from both datasets, employing sort-merge or hash join techniques exploits the parallel partitioning and hence provides scalability and divisibility. New partitions can be added for processing without effecting the on-going processing. However, in comparison, performance of the hash join is better than that of the sort-merge join since hash joins have linear cost, as long as a minimum amount of memory is available [22]. For the rest of our discussion and for the implementation of our algorithm, we will consider only the hash-join algorithm.
15
2.4 Hash Join algorithms
Several variations of the hash join algorithm exist such as the classical hash join, grace hash join, hybrid hash join etc., with little differences. In essence, all of them build a hash table for the keys of the inner relation. The keys of the outer relation are iterated over and matched against the hash table entries. The matching tuples are written to an output table. The different flavors of the hash join algorithm differ in granularity of data used for the join operation. We discuss all of these join algorithms in brief for a single processor machine and then elaborate only the grace join algorithm for distributed setting.
2.4.1 Join algorithms on a single processor machine
Let us consider that relations R and S have to be joined on the join column R.r= S.s. We assume S to be the smaller inner relation and R to be the outer relation. We also assume an inner-join between the relations.
1. Classical hash join
This simplest and most basic hash join consists of following steps:
1) For each tuple ts of the inner build relation S
a. Add it in an in-memory hash table on the basis of a hash function h applied on the key i.e. h(key).
b. If no more tuples can be added to the hash table because the memory of the system is exhausted:
i. For each tuple tr of the outer probe relation R, apply the same hash function h(key) on the key. Use the result as an index into the in-memory hash table built in step 1 to find a match. ii. If a match is found, a joined record is produced and the result
is outputted.
iii. Reset the hash table.
2) In a final scan of the relation R, the resulting join tuples are written to the output.
16
2. Grace hash join
A refinement of the classical hash join is the grace hash join algorithm. The grace hash join algorithm partitions the relations and carries out the above hash join technique for each partition rather than for the whole relation (Figure 5). This reduces the memory requirements for keeping the in-memory hash table.
1) R and S are divided into n partitions by applying a hash function h1 on the join column of each tuple tr and ts of relations R and S respectively. Both partitioned relations are written out to the disk.
2) For the smaller relation S, a partition is loaded and an in-memory hash table is built using a hash function h2 on the join column of each tuple ts in the build phase. Hash function h2 divides the partition into a number of buckets so that matching with the probing tuples becomes efficient.
3) During the probe phase, a similar partition from the relation R is read. The same hash function h2 is applied on the join attribute of each tuple tr and matched against entries in the in-memory hash table.
4) If a match is found, a joined record is produced and written to the disk. Otherwise the tuple tr is discarded.
Figure 5: Grace Join Algorithm (source [44])
3. Hybrid Hash Join
A minor refinement of the grace join algorithm is to keep one partition in memory instead of outputting to the disk and then use it for joining. This is the hybrid of the classical and grace join algorithms. The hybrid hash join follows these steps:
17
1) When partitioning the relation S, all partitions except the first one are written to the disk. An in-memory hash table is built for this partition. 2) When partitioning the relation R, all partitions except the first one are
written to the disk. The tuples of this partition are used to probe the in-memory hash table of step 1 for matching. If a match is found, the joined record is written to the disk.
3) After exhaustion of the first partition, the procedure of the grace join algorithm is carried out for the rest of partitions.
Since first partitions of both relations are never written to the disk and are processed on-the-fly, this avoids the cost of reading back these partitions from the disk to the memory.
2.4.2 Hash join algorithm for parallel implementation
Because of their nature, the grace- and hybrid- hash join algorithms can easily be parallelized. The difference between the single processor and multi-processor/parallel variants of these algorithms is that the partitions are processed in parallel by multiple processors in the parallel variant (Figure 6). Below are the steps for the grace hash join algorithm on a multi-processor system:
1) The input relation R is horizontally divided into n partitions such that each partition carries approximately |R|/n tuples. A hash function h1 is applied on the distribution key. Here we make the join key as the distribution key so that tuples with same join key are propagated to the same partition. The range of this hash function is from 0 to n-1 so that keys can be directed to one of the n nodes. The n partitions of R formed as a result of the hash distribution are written to the disk.
2) A similar process is carried out for relation S. It is divided into n partitions, each partition carrying about |S|/n tuples, by applying the same hash function h1. This ensures that a partition x of the relation S contains the same join keys as partition x of the relation R. The partitions of S are also written to the disk.
3) Each processor reads in parallel a partition of relation S from the disk. It creates an in-memory hash table for the partition using a hash function h2.
18
4) A corresponding partition of relation R is also read in parallel from the disk by each processor. For each tuple in this relation, it probes the in-memory hash table for any match. For each matching tuple, a joined record is outputted to the disk.
Since all the n partitions of a relation are completely independent of each other, parallel processing can be carried out. Each processor handles the corresponding partitions from both the relations and writes the joined records for the matching tuples. Thus parallelizing the join operation improves the performance by a factor of the number of PUs.
Figure 6: Joining on parallel machines
2.5 Skew and its impact on join operation
In databases, it is common that certain attribute values occur more frequently than others [19], [21]. This is referred to as “data skew”. Skew in the input data can limit the effectiveness of parallelization of the join query [14]. As discussed earlier, parallelizing a join operation consists of the following steps:
1) Tuples are read from disk
2) Selection and projection are carried out on the basis of query 3) Tuples are partitioned among the parallel sites
19
Skew can occur at any of these stages and hence categorized as the tuple placement skew, selectivity skew, redistribution skew, and join product skew for each of the above stages respectively [17]. The initial placement of tuples in partitions may vary, giving rise to the tuple placement skew. The selectivity skew results from the fact that applying a selection predicate on different partitions may result in a varying number of selected tuples left in each partition. The redistribution skew is caused by varying number of tuples in the partitions after applying the redistribution scheme for partitioning. The join product skew is the result of differences in join selectivity at each node. For our implementation, we are not considering the tuple placement skew since Map/Reduce creates the file splits of almost even sizes. The selectivity skew is also ignored because firstly, it does not have any considerable impact on the performance and secondly, we assume in our programs that the selection and projection predicates are not applied. The join product skew cannot be avoided because it is evident only after the partitions of the two relations are joined. The redistribution skew is the most important and major type of skew that impacts the load distribution among nodes. This skew is caused by the selection of an inappropriate redistribution strategy for partitioning. In further discussions of skews, we will be referring only to the redistribution skew and will handle only this skew in our implementation.
After the redistribution of tuples in partitions, the hash join algorithm is applied on the partitions of the two datasets at each node. Although the hash join algorithm is easily divisible and scalable, it is very sensitive to skew. Skew in the keys results in the variance in time taken by processing nodes. If some keys appear quite frequently in the input relation, an overly used key is sent to only one processing node on the basis of the hash partitioning. This results in an uneven distribution of the keys since the partitions receiving overly used keys will contain too many tuples. As a result, the nodes processing these partitions take too much time for completion and hence become a performance bottleneck. The performance of the whole distributed system is adversely affected by these heavy-hitter nodes as some nodes remain underutilized. To take full benefit from the parallel distributed environment, it is therefore important that the redistribution strategy should be selected in such a way that partitions are of considerable size and evenly distributed to avoid the load imbalances. Current partitioning strategies are divided into two categories: hash partitioning and range partitioning [16]. Both partitioning strategies have different sensitivities to different degrees of skew in the input keys. In the following
20
discussion, we observe their sensitivities and conclude which partitioning strategy is most effective for skew handling and should be incorporated in our algorithm.
2.5.1 Hash Partitioning and its skew sensitivity
As discussed earlier, during the redistribution phase, a partitioning function is applied on the keys of input datasets to distribute the workload among a number of nodes for parallel join computation. The hash partitioning is the most commonly used partitioning strategy. It distributes tuples to PUs on the basis of the hash value of the redistribution key.
PU_no = h(k) mod Npu
Here, k belongs to the domain of the redistribution key attribute and Npu is the number of PUs in the system. PU_no determines the PU to which a tuple with key k should be forwarded.
However, it is the hash partitioning that may end up the redistribution phase in skewed partitions. With the hash partitioning, the number of tuples that hash to a given range of values cannot be accurately determined in advance. Whenever, relations are distributed among the parallel processing nodes on the basis of hashing, a key that is skewed will be directed to one and only one processing node. Selection of a good hash function is not a solution to the problem of data skew. Even a perfect hash function will map every join attribute with same value to one partition and hence a partition that receives all of these overly used keys will be overloaded.
Let us consider an example to understand the situation that arises from skewed partitions generated by the hash partitioning. Two relations “patent” and “cite” are to be joined (Figure 7). The patent relation lists the patent ID and its grant year. The fields in the cite table are citing patent and cited patent. Data in the patent relation is unique i.e. there is one row for each patent. However, one patent may cite one or many other patents. Therefore the cite relation may contain more than one entry for the same patent. Some patents are too popular and hence are cited by a number of other patents. On the other hand, some patents are once or not at all cited by other patents. This presents a possibility for data skew. Now for each patent cited in the cite table, we may need to determine information about the citing patent. This can be done by joining “patent ID” attribute in the cite and patent relations. This presents the case of single skew since one of the relations contain the skewed data.
21
Figure 7: Example of data skew -- Patent and Cite Tables
For example, patent 345634 is cited by 5,000 other patents, so the cite table would contain 5,000 entries for this patent. When partitioning the cite relation by applying a hash function on the patent ID attribute, one single node would receive at least those 5,000 tuples irrespective of how many tuples are directed to other nodes. Selection of a good hash function has negligible impact on the skew in partitions. Although selection of a perfect hash function may restrict two different join attribute values to be hashed into the same partition, the imbalance discussed above may still be present even using this ideal hash function since the hash partitioning directs the same keys to one partition. So applying a hash function such that only patent 345634 is in one partition would still overload this node with 5,000 tuples while other nodes may not have sufficient load. There is no smarter hash function that can avoid the imbalance because of key repetition since it is by the very nature of the hash function to direct same keys to same partition. This resulting uneven distribution nullifies the gains achievable by parallel infrastructure.
A more practical example of heavy data skew can be of the data received from sensors in a sensor network. These sensors continuously send the sensed values (which can be raw bytes, complex records, or uncompressed images etc.) to a monitoring station where the values are logged. Let us consider that a log dataset L logs for each sensor the sensor ID, time stamp, and the sensed humidity of a place being monitored. A sensor dataset S stores information about the sensor ID, the name of the place being monitored, and the sensor manufacturer. A monitoring station may need to join the relations L and S. In practice, some sensors may be of very high frequency and send data for logging very frequently while others may not
22
have very high frequency. Therefore, the join operation using the hash partitioning would clearly overload the partitions handling a high frequency sensor. This will eventually throttle the performance of the distributed system.
2.5.2 Range Partitioning and its skew sensitivity
As we have seen, hash functions used in the redistribution phase may result in imbalanced partitions in case of heavy skew in the input data. A good redistribution strategy should distribute the overly used keys to more than one partition. However, the overly used keys should first be determined and then this information can be used for deciding the partition boundaries. Two strategies of partitioning datasets on the basis of their key distribution are the simple range partitioning and the virtual processor partitioning.
1. Simple Range-based Partitioner
As opposed to the hash partitioner, which at best allocates a single attribute value to a partition, a range partitioner may allocate a sub-range of the join attribute value to one partition. In the simple range partitioning, the number of partitions is equal to the number of PUs. Since one partition is handled by one PU, allocating only a sub-range of a single value to one partition reduces the burden on one PU in case of heavy skew. A split vector determines the boundaries for distribution of values among partitions. The entries of the split vector may not divide the value range in equally spaced blocks. This has, in fact, a positive impact since these entries may be chosen in such a way as to equalize the number of tuples mapped to each partition. Given “p” PUs, the split vector contains p-1 entries {e1, e2, e3,…, ep-1}. This split table determines the ranges for range partitioning. From this split vector, each PU is assigned a lower bound and an upper bound for the range partitioning (except the first PU which does not have a lower bound and the last PU which does not have an upper bound). All the tuples that have their join key attribute falling in a particular range are sent to the PU associated with that range i.e. keys ≤ e1 are routed to processor 1, e1 < keys ≤ e2 are directed to processor 2, and keys > ep-1 find their way to processor p.
How to select this split vector? A good split vector can be selected by sampling the input relations so that an estimate of the distribution of join attribute values in the data can be obtained. Sorting the input dataset R with cardinality |R| and then
23
selecting split values after a step size of |R|/p is one solution. However, this is an inefficient solution since a datasets containing trillions of records takes too long for sorting. Random sampling the input relation on the join attribute value is a better alternative which may give an estimate of the join key distribution in the input relation without parsing the whole input relation. “x” number of samples are taking randomly from the input relation (“x” needs not to be same as “p” i.e. the number of processors). Theoretically, the greater the number of random samples taken, the better idea we get about the distribution of join attribute values in the input data. [13] experimentally determines that O(√n) is the optimal number of samples for efficient random sampling. The resultant split table T, containing the randomly selected join attribute values, is sorted. Since the size of this sample table is many folds smaller than the input dataset, sorting T does not take a significant time. A split vector is determined from this split table by collecting values after a step size of |T|/p.
Since an input dataset is randomly sampled, there are fair chances that the skewed attribute value will occur more than once in the split vector. Let us consider that split vector contains {e1, e2, e2, e3} for a 5-node distributed system. Keys ≤ e1 are assigned to node 1 but e1 < keys ≤ e2 can be directed to either node 2 or 3. When a key can be directed to more than one nodes, a node is selected at random from the candidate nodes and the tuple is sent to it. In this way, the skewed key attribute e2 will be distributed among more than one nodes and hence only one machine will not be penalized for the skew.
A question arises as to which of the input relations should be sampled: the building relation or the probing relation? The building relation is more sensitive to data skew than the probing relation since an in-memory hash table needs to be built for the building relation. Thus the building relation is sampled and then partitioned according to the resultant split vector. The same split vector is used for the probing relation as well. As we discussed earlier, in the case of repetition in the split vector, a tuple of the building relation is sent randomly to any of the nodes that serves this value (candidate nodes). For the probing relation, a value that falls in more than one ranges is sent to all the candidate nodes. For the split vector {e1, e2, e2, e3} in the example above, key e2 of the probing relation will be sent to the nodes 2 and 3. The procedure can be reversed as well i.e. an alternative can be to send a tuple of the building relation whose join key belongs to more than one ranges to all of the candidate nodes and randomly send a corresponding tuple from the probing relation to only one of the candidate nodes. However, this is not an effective alternative
24
since the replication of the build keys in multiple nodes will increase the size of the in-memory hash table for all such nodes. In effect, we want to keep the build relation as small as possible to reduce the memory requirement of the in-memory hash table. Thus, for our implementation, we will be duplicating the key of the probing relation whenever a key falls in multiple ranges.
2. Virtual processor partitioner
In the simple range partitioner, each PU handles only one partition and hence the number of partitions is the same as the number of PUs. An improvement in the range partitioning can be achieved by making the number of partitions greater than the number of PUs. Skew can be handled efficiently if we have a large number of partitions. We then assign these partitions to the processors either in a round robin fashion or we can dynamically feed the processors with partitions when they are finished with their earlier workload. The key idea behind this virtual processor approach is that if data is skewed, having a large number of partitions will spread the skewed data over a number of partitions. As a result, work gets more evenly distributed and the system does not suffer from the inefficiencies caused by skew. The number of partitions in the virtual processor approach should be a multiple of the number of PUs. So in the case of a 5-PU system, the number of partitions should be 10, 15, 20, etc. Load imbalance may occur if the number of partitions is not a multiple of number of PUs. For example, in case of 11 partitions, each PU handles two partitions except one which handles three partitions. So while this PU is processing the last partition, all other PUs remain idle.
For the example discussed earlier, if we take '2' as a factor for the virtual processor partitioning, our split vector will contain 10 instead of 5 splits. So our split vector becomes {e1, e2, e2, e2, e3, e4, e5, e6, e7}. A key falling in a particular range is allocated to its associated partition e.g. keys ≤ e1 are assigned to partition 1; e1 < keys ≤ e2 are assigned to partition 2, 3, or 4; e2 < keys ≤ e4 are allocated to partition 5, and so on. As it can be observed, join keys e2 are spread apart to three partitions instead of two and hence reduces the accumulation of skewed keys in a single partition. The evaluation section experimentally determines the number of virtual processors that may provide an optimal performance.
As evident from the discussion, the hash join does not efficiently handle the situation when input data is heavily skewed on a particular key. For such situations, the range partitioning performs better since it spreads the skewed keys to more than
25
one partitions. For the implementation of our algorithm, we use the range partitioning strategy to handle skewed keys.
2.6 Pre-processing for joining
Sometimes a dataset may be very large but a big portion of it may not be participating in the join operation. One example of this scenario can be a log table that stores the logs of activities on Facebook for one hour. Another table U contains information about Facebook users. We have to create a join between these two tables to associate the log of a user’s activities with some additional information of that user. Facebook currently has 400 million registered users [42] but not all of these users may be active in one hour. Loading the complete user table U table to create a join with log table L is definitely a waste of resources since the complete U table will be shuffled across the network but eventually a big portion of this table will not be used in the join. The performance of join operations can be enhanced by filtering the U table with only those users whose activities are logged in L table and then L can be joined with the reduced size U table. This pre-processing significantly reduces the amount of data distributed over the interconnection network and also the sizes of tables to be joined. The pre-processing can be achieved through semi-joins using selection or bit-filtering.
Semi join is a relational operator used to reduce processing cost of queries involving binary operators (e.g., join). A semi join from S to R is represented by
R S on attribute X. The result of a semi join between R and S is a subset of tuples of R for which there are matching tuples in S for the attribute X. Mathematically, a semi join from S to R is represented as:
Rx: {r ∈ R| s ∈ S , r. X = s. X}
The general computational steps are:
1) Project S on attribute X and select only the unique values of the attribute (Sx).
2) Reduce R by the unique keys Sx. This eliminates un-necessary tuples from R that are not going to take part in an operation between R and S.
26
Let us consider an example of semi join for two relations: Books and Authors shown in Table 1 and Table 2.
Table 1: Books Relation
Title ISBN Author
Pro Hadoop 1430219424 Jason Venner
Hadoop in Action 1935182196 Chuck Lam
Hadoop: The Definitive Guide 0596521979 Tom White
Data Intensive Text Processing
with Map/Reduce
1608453421 Jimmy Lin
Table 2: Authors Relation
Author Home page
Jason Venner http://www.prohadoopbook.com/profile/JasonVenner
Tom White http://www.lexemetech.com/
The semi join of the Authors relation to the Books relation lists only those books for which there is an author in the Authors relation. The result is shown in Table 3.
Table 3: Filtered Book Relation
Title ISBN Author
Pro Hadoop 1430219424 Jason Venner
Hadoop: The Definitive Guide 0596521979 Tom White
Since now both the relations contain only those tuples that take part in the actual join, the data to be processed for the join operation is reduced. This clearly will not make any significant difference for the trivial example explained above. However, for massive datasets containing billions of records from which only
27
hundreds or thousands are going to take part in the actual operation, the reduction in the size of datasets significantly reduces the transmission, storage, and computation overhead. The result is an improved performance of the actual operation. However, this pre-processing incurs some cost. We determine in the evaluation section whether the improvement in the performance of the actual operation is worth the pre-processing cost.
Semi-joins can be computed through two methods: using selection or using bit-filtering. We discuss both of them here with respect to Hadoop implementation.
2.6.1 Semi-join using selection
If a relation R is semi-joined by a relation S, R is filtered by only the tuples present in S. The semi-join is completed in three stages, each representing one Map/Reduce job:
1) Stage 1: We determine the unique keys from relation S. Each mapper in a Map/Reduce job receives an input split of S. An in-memory hash table is built as tuples are read from the input split. If a tuple with a new key arrives, we place the key in the hash-table and also emit the key. If the key is already present in the hash table, we do not emit anything. Here we do not need any value, so we emit “null” instead of the value. In the reduce phase, we emit (key, null) for each key-value bucket. We restrict the number of reducers here to 1 so that we get only one single file Skeys listing all the unique keys of S.
2) Stage 2: The second phase filters the relation R with unique tuples of relation S, stored in Skeys. The table Skeys is broadcast to all mappers through the distributed cache, an in-memory hash table is built, and each mapper emits a record of R only if its matching key is present in the hash table. The reducer here is an identity reducer since we don’t need any further operation. The result is r files for Relation R, where r is the number of reducers. The result is outputted to HDFS. In the second stage, we try to emit the records such that the key is the join key and apply the same hash function on it for partitioning as will be used for partitioning the relation S in stage 3. This will help us save one Map/Reduce job to partition the relation R on the join attribute for the map-side, reduce-size, and our hybrid join.
28
3) Stage 3: Any of the above hash join algorithm is used to compute the actual join between the filtered R relation produced in Stage 2 and the relation S.
The semi-join using selection has some inherent limitations. An in-memory hash table has to be built to record the unique keys. Each entry of the hash table is the key itself which may occupy several bytes. This can result in a huge space occupied by this data structure if keys are quite big and the input dataset contains huge number of records.
2.6.2 Semi Join using Hash Operator
The semi-join using hash operator produces a search filter using a bit array. This bit array maintains the hash projection of the unique keys of S. The tuples of R are filtered from this search filter. One such filter is called “Bloom filter”.
Bloom Filter: The bloom filter, named after its inventor Burton Howard Bloom, is an efficient mechanism to test membership of an element in a set. It is a bit array in which n elements are mapped into m bits using k hash function. A bloom filter may have false positives but no false negatives i.e. an element can falsely be shown as member of a set; however, if an element is a member of a set, the membership test always returns true.
To construct a bloom filter, k hash functions are applied on each member of the set. For example, for an element e, k hash functions h1(e), h2(e), h3(e), … , hk(e) produce k index values in the range 1 to m, where m is the size of the filter. These k indices in the bit array are set to 1. In order to test the containment of an element in the set, the same k hash functions are applied on an element to determine the indices in the bit array. If all those bit array locations are set, either the element is contained in the set (true positive) or the bits were set for some other element (false positive). However, if any of these bits are 0, the element in not contained in the set. The generation of indices for setting the bit values using hash functions is depicted in Figure 8. If an index returned by a hash function is already set (for some earlier element), the bit remains set. The changes in the bit filter by adding new element over time is shown in Figure 9.
29
Figure 8: A bloom filter with m=15, k=3 for a set {A, B, C}
Figure 9: Bit setting in bloom filter in case of collision
The size of a bloom filter is set at the beginning and it remains constant throughout, no matter how many elements are added to it. However, adding more elements results in increasing the false positive rate since a bit value will be a representative of the increased number of elements. The accuracy can be enhanced by increasing the size of the bit array. The bigger the bit array, the smaller is the probability that two elements are represented by the same bit indices. However, increasing the size may result in a space overhead. The number of hash functions k also affects the accuracy since these hash functions map the elements of a set into the bits. Increasing k reduces the collisions and hence decreases the false positive rate. The k hash functions should be chosen to be as independent as possible. There should be little, if any, correlation so that the indices to which an element is mapped are unique. For constructing a bloom filter, the trade-off between accuracy and size of the filter and number of hash functions should be taken into account.
One of the major advantages of the bloom filter is its space compactness. The space requirement for a bloom filter is the least as compared to other data structures
30
like linked list, hash maps, sets, and arrays for testing the containment. Therefore, as compared to the semi-join through selection (which uses Java’s HashMap to store unique keys), the bit array of a bloom filter occupies little space and hence adds to the efficiency. In our test, we make a bloom filter of 50,000,000 bits which occupies about 5.96MB of disk space. For the semi-join through selection, we store the unique keys in a HashMap (the size of each key is 8 bytes). For 10,000,000 records, this results in about 76MB of space requirement (if we assume that all keys are unique). Thus the space requirement of the semi-join through selection is about 12 times greater than the semi-join through bloom filter. This difference between the space requirements of the semi-join through bloom filter and the semi-join through selection increases quite drastically when the key size increases. For example, for a key size of 100 bytes, the semi-join through selection in the above case will need 953MB of storage which is about 160 times more than that required for the bloom filter.
Bloom Filter using Map/Reduce: Since a bloom filter reduces the amount of data to be processed, we can construct a bloom filter for one of the relations (let us call it S) and then distribute this filter to all the map tasks that process the input splits of the second relation (R). In each map task, each record of the relation R is checked for containment in the bloom filter. If the record does not pass through the filter, it is discarded and hence never processed.
Applying a bloom filter is carried out in two independent stages, each being a separate Map/Reduce job:
1) Stage 1 - Construction of a bloom filter: We want to construct a bloom filter that records the unique “join keys” of the relation S (where S is the smaller of the two relations). Each mapper implements a bloom filter of size 5,000,000 bits using Java’s BitSet. For each record of its input split of S, each mapper applies k hash functions on the key to generate k indices. These k indices are turned on in the bloom filter to add the key to the set. Depending on the size of the input dataset, the framework can initiate more than one mappers. In that case, each mapper will generate one bit filter (Figure 10). To combine all these bit filters in one, we employ only one reducer so that bit filters from all the mappers are processed by the same reducer. The reducer takes the UNION of all the bit filters and writes the resultant in binary format to HDFS.
31
2) Stage 2 - Filtering using a Bloom Filter: For the Map/Reduce job processing the relation R, each mapper needs the bloom filter constructed in stage 1. This bloom filter file can be distributed among the mappers using the “Distributed Cache” mechanism of the framework. Each mapper reads the binary file and reconstructs the bloom filter in the configure() method i.e. before the start of any map task. In each map task, an input record of relation R is checked for containment in the bloom filter set by applying the same hash functions on the key of the record. If the key passes through the filter, it is processed further. Otherwise the key is discarded and is never used.
Figure 10: Construction of a Bloom Filter
In the evaluation section, we determine which semi-join method provides better performance. This method is then used for the subsequent experimentations.
32
Chapter 3
Join Algorithms on Hadoop
A number of different join algorithms are provided by Hadoop: map-side, reduce-side, and memory-backed join algorithms. In this section, we discuss these algorithms, present our own algorithm, and provide some important implementation-specific details w.r.t. Hadoop.
3.1 Reduce-side joins
As the name suggests, a reduce-side join is carried out on reducer nodes. A two-way join using the reduce-side join algorithm is completed in one Map/Reduce job. The key idea behind the reduce-side join is that each mapper tags input tuples from both datasets with their source information and generates tagged key-value pairs such that the emitted key is the join key. All the tagged key-value pairs are shuffled across the network and each reducer receives the tagged key-value pairs from both datasets such that the pairs share the same join key attribute. The join is carried out by each reducer between the records of the two datasets and the joined records are outputted.
Implementation details: Consider two relations R(A,B) and S(B, C) that have to be joined together. Both relations are stored in separate files. These two relations have to be joined on the key “B”. “Keys” in Map/Reduce are not essentially the same as in relational databases. The Map/Reduce keys are not unique. They are just the attributes used to distribute data among reduce processes. To keep things simple, we consider an inner-join between R and S. The sequence of steps to join R and S using Hadoop are as under (depicted in Figure 11):
1) Splitting: The input files of both the datasets are split by Hadoop into manageable splits that are assigned to a collection of map processes.
2) Mapping: Each map process receives key-value pairs from the input datasets and generates intermediate key-value pairs such that in each pair,