Map (key k, value p) Function Auxiliary Inputs
- S! Space partitioning strategy
- r! distance threshold of the outlier algorithm
1- Begin
2- Ci" Compute the grid cell in which p is a core point;
3- output (Ci, “0-p”);
4- For (each grid cell Cj in which p is a supporting point) Loop
5- output (Cj, “1-p”);
6- End Loop
7- End
Reduce (key cell-Id, v-list [ p1, p2, …, pm ]) Function
Auxiliary Inputs
- k!count threshold of the outlier algorithm
- r! distance threshold of the outlier algorithm
1- Begin
2- core-list " the set of points in v-list with prefix tag = “0”
3- support-list "the set of points in v-list with prefix tag = “1”
4- outlier-list " Execute OutlierDetection(v-list, r, k);
5- For (each outlier o in outlier-list) Loop
6- If (o exists in core-list) Then
7- output (null, o) // Report Outliers
8- End Loop
9- End
Figure 13.2: DOD: MapReduce Pseudocode of the DOD Framework.
13.3
MapReduce Implementation of DOD
We sketch one MapReduce implementation of the supporting areapartitioning strategy in Figure 13.1(b). For the ease of implementation, instead of directly applying the sup- porting area definition in Def. 18.1, we utilize the simplified definition in Def. 13.4.
Definition 13.4 Given a d-dimensional grid cell Ci, the supporting area of Ci is an
r-extension to the boundaries of Ci in each dimension. That is, Ci.suppArea =
13.3 MAPREDUCE IMPLEMENTATION OF DOD
(lowxi,highxi)are the boundaries in thexth dimension ofCi(1 ≤x ≤d).
Since the supporting area defined in Def. 13.4 is a superset of the supporting area in Def. 18.1, it is guarenteed to be sufficient to support each grid cell to be processed independently without relying on the points in any other cell.
The pseudocode of the map and reduce functions is presented in Figure 13.2. The input dataset, which resides in HDFS, has no prior partitioning properties, i.e., the data points are randomly distributed over the HDFS blocks. Each map function retrieves one data block as well as the space partitioning strategy (Figure 13.1(b)). Then for each data pointpi, the map function produces two types of output records, i.e., core- and supporting-
related records.
The core-related record is one key-value pair record in the form of (K =Ci, V =“0-
pi”), where the key is the ID of the grid cell for whichpiis a core point, i.e.,pi ∈Ci. The
prefixed flag “0” in the value component indicates thatpi is a core point forCi (Lines 2-3
in the map function in Figure 13.2). For example, referring to Figure 13.1(b), the mapper
Map 1generates output record (K =C5, V =“0-p1”) for data pointp1.
Mappers also create zero or more supporting-related records for an input data point
pi in the form of (K = Cj, V =“1-pi”), where the key pi ∈Cj is the ID of the grid cell
for whichpi is a support point, i.e.,pi ∈Cj.suppArea. The prefixed flag “1” in the value
component indicates that pi is a support point forCj (Lines 4-6 in the map function in
Figure 13.2). For example, in Fig. 13.1(b), the mapperMap Ngenerates three additional output records for pointp4since it is a support point forC3,C6, andC7.
After the internal shuffling and sorting phase based on the cell ID, each group received by a reducer will correspond to a specific grid cell, say Ci, and will consist of the union
of the core and support points belonging toCi (See Figure 13.1(b)). The reducer function
categorizes the data points according to their attached flag encoded in the value (lines 2-3 in the reduce function in Figure 13.2). Lastly, it executes an outlier detection algorithm
13.3 MAPREDUCE IMPLEMENTATION OF DOD
14
DOD with Load Balancing
As presented in Section 13.2, the DOD framework adopting the uniform space parti- tioning method uniSpaceleads to optimal duplication rate when handling uniformly dis- tributed datasets. However, the datasets in most real-world applications are not uniformly distributed over the domain space. Therefore, the uniSpacepartitioning method may, at times, cause a severe load imbalance. For example,Reducer 1in Figure 13.1(b) may pro- cess grid cellC1which contains an order-of-magnitude more points thanC7processed by Reducer M. Load imbalance has been shown to be one of the most challenging problems for distributed data processing, e.g., [61]. It may not only result in significant slowdown, but also cause job failure in some cases. In this section, we investigate more sophisticated partitioning methods to overcome this challenge.
14.1
Data-Driven Partitioning (DDriven)
In this section, we propose a data-driven partitioning method called DDriven that now takes the data’s distribution into account. Hence, DDriven generates grid cells that, in spite of having different grid sizes, contain a similar number of data points (equi-
14.1 DATA-DRIVEN PARTITIONING (DDRIVEN)
cardinalitycells). DDrivenrelies on a lightweight pre-processing strategy to determine a plan for partitioning the domain space into grid cells, henceforth called the partitioning plan. This new pre-processing phase is composed of two steps, namely distribution esti- mationandpartitioning-plan generation. Both steps can be performed using one MapRe- duce job.
In the first step,DDrivenestimates the distribution of the data by drawing a sample from the input dataset. We opt for random sampling since it preserves the distribution of the underlying dataset [62]. Since we only need to roughly estimate the distribution, the sampling rateΥas an input parameter by default is set to a small value, e.g., 0.5 %. Considering the size of big datasets, the sample is generated in a distributed fashion, for example by drawing samples within the map phase of a MapReduce job. Then, the map- pers’ output is passed to a centralized node, i.e., a single reducer, for the plan generation step. Since drawing the random sample at the map phase is intuitive [62], we ignore the details here. Map 1 Map 2 Map N Reduce HDFS Data Blocks Generate a sample given a sample rateϒ
(a) The data driven partitioning for generating approximate equi-depth partitions
Single reducer generating an equi-depth partitioning plan
Step 1:
- Consider the dimension having the closest-to-uniform distribution (di)
- Generate equi-depth di.numBuckets
partitions on di
Step 2:
- Consider the next dimension having the closest-to-uniform distribution (dj)
- For each of the existing partitions, divide into equi-depth dj.numBuckets partitions
Reduce Function Inputs: - Sample data produced from map tasks
- di.numBuckets! Number of desired buckets in each dimension
Output: - Partitioning plan
1. Begin
2. For each dimension diLoop
3. di.score" chi-square test on the sample projected on di
4. EndLoop
5. dimList " Order the dimensions ascendingly based on
di.score
6. Create equi-depth partitions for the 1st dimension in dimList
7. For the subsequent dimension dx in the dimList Loop 8. For each of the existing partitions Loop
9. Divide it into dx.numBuckets equi-depth partitions 10 End Loop
11. End Loop 12. End
(b) Pseudocode of the centralized Reduce() Function
Figure 14.1:DOD: Equipped with the Data-Driven Partitioning Strategy (DDriven).
In the next step, the partitioning plan is generated by the single reducer (Fig- ure 14.1(b)). In addition to the sample data, the reducer receives a list that specifies the number of desired partitions in each dimension. For instance, di.numBuckets is the
number of desired partitions for dimensiondi. This list is calculated based on the number