Resource-Aware Scheduling for MapReduce
5.5 Scheduler Design
5.5.2 Algorithm Description
We formally introduce our scheduling algorithm in this section. Specifically, each job j in the system consists of two types of tasks: map tasks M and reduce task R. Let τ (t) ∈ {M, R} denote the type of a task t. Given a phase i belonging to a task t that can be scheduled on a machine n, we can define the utility function of assigning a phase i to machine n as:
U (i, n) = Uf airness(i, n) + α · Uperf(i, n) (5.3) where Uf airnessand Uperf represent the utilities for improving fairness and job performance, respectively, and α is an adjustable weight factor. If we set α to a value close to zero, then the algorithm will greedily schedule phases according to the improvement in fairness. No-tice that considering job performance objectives will not severely hurt fairness. When a user is severely below its fair share, scheduling any phase with non-zero resource require-ment will only improve her fairness. The exact value of α can be determined based on experience.
Now we describe each of the terms in equation (5.3) in detail. We define
Uf airness(i, n) = Uf airnessbef ore (i, n) − Uf airnessaf ter (i, n) (5.4) where Uf airnessbef ore (i, n) denotes the fairness measure of the user before scheduling i on n and Uf airnessaf ter (i, n) is the new fairness measure of the user after scheduling i on n. For example,
we can define
where J denotes the job that are currently running, F Sjbef ore and F Sjaf ter denote the fair share of job j before and after i is scheduled, and F Sj∗ is the average fair share, (i.e.
F Sjbef ore∗ = 1JPJ
j=1F Sjbef ore and F Sjaf ter∗ = J1 PJ
j=1F Sjaf ter).
On the other hand, Uperf(i, n) is more difficult to compute. As mentioned previously, if i is the first phase of a map (or reduce) task t, then Uperf(i, n) measures the gain in parallelism in terms of the number of running map tasks (or reduce tasks). Otherwise, if i is a subsequent phase of task t, then Uperf(i, n) measures the gain in shortening the running time of task t. Formally, we define
Uperf(i, n) = (
Utask(i, n) iis the first phase of a task
Uphase(i, n) Otherwise (5.7)
Even though PRISM does not specify the function for computing the utility of a phase, in our current implementation, we have chosen Utask(i, n) to be
Utask(i, n) = Nremaining
max{Ncurrent, } − Nremaining
Ncurrent+ 1 (5.8)
where Nremaining denotes the number of remaining tasks of type τ (t) (i.e. the number of remaining tasks of the same type as t), and Ncurrent denotes the number of tasks of type τ (t) that are running. The variable is used to prevent dividing by 0. Intuitively, Utask(i, n) measures the gain in parallelism if the number of running tasks is increased from Ncurrent to Ncurrent+ 1.
On the other hand, let Twaitt denote the number of seconds that task t has been paused due to phase-based scheduling. The utility for scheduling a non-leading phase i of task t can be expressed as a function p(·) of Twaitt :
Uphase(i, n) = p(Twaitt ) (5.9)
There are many possible choices for p(·). For example, we can define p(·) as a linear function (i.e. p(Twaitt ) = a · Twaitt + b for constants a and b), which would increase the
Algorithm 5 Phase-Level Scheduling Algorithm
1: Upon receiving a status message from a task tracker on machine n 2: Compute the resource utilization of machine n
3: P haseSelected ← {∅}
4: CandidateP hases ← {∅}
5: for each job j in the system do 6: for each scheduable phase i ∈ j do
7: CandidateP hases ← CandidateP hases ∪ {i}
8: end for 9: end for
10: while CandidateP hases 6= ∅ do 11: for i ∈ CandidateP hases do
12: if i is not schedulable on n given current utilization then 13: CandidateP hases ← CandidateP hases\{i}
14: continue;
15: end if
16: Compute the utility U (i, n) as in equation (5.3) 17: if U (i, n) ≤ 0 then
18: CandidateP hases ← CandidateP hases\{i}
19: end if 20: end for
21: if CandidateP hases 6= ∅ then
22: i ← task with highest U (i, n) in the CandidateP hases 23: P haseSelected ← P haseSelected ∪ {i}
24: CandidateP hases ← CandidateP hases\{i}
25: Update the resource utilization of machine n 26: end if
27: end while
28: return P haseSelected
utility of scheduling i to increase linearly with the number of seconds that the task has been paused. However, in our implementation, we have chosen p(·) to be a quadratic function p(Twaitt ) = a · p(Twaitt )2 + b. The intuition to using a quadratic function is to increase the urgency for scheduling i more rapidly if i has been paused for a long time.
However, PRISM can adopt any type of utility function p(·) as long as it is a monotonically increasing function.
Finally, the scheduling algorithm used by our phase-based scheduler is illustrated by Algorithm 1. Specifically, upon receiving the status message from a node manager running
on machine n, we first compute the utilization u of the machine using job’s phase-level profile (Line 2). We then compute a set of schedulable candidate phases (Line 4 − 9), and select phases in an iterative manner. In each iteration for each schedulable phase i ∈ P (j) of each job j, we compute the utility function U (i, n) according to equation (5.3) (Line 16). Then we select the phase with the highest utility for scheduling (Line 22 − 23), and update the resource utilization of the machine (Line 25). Then the algorithm repeats by recomputing the utility of all the phases in the candidate set, and select the next best phase to schedule. The algorithm ends when the candidate set is empty, which means there is no suitable phase to be scheduled. As for the running time, assuming there are N tasks in the system1 and each machine can schedule at most k tasks, the running time of the algorithm is O(N k).
5.6 Experiments
We have implemented PRISM in Hadoop 0.20.2. Implementing this architecture requires minimal change to the existing Hadoop architecture (around 700 lines of code). Even though there are several fairness criteria we can implement in PRISM, currently we im-plemented the running-time fairness proposed by Isard et. al. [63], where the fair share of each job is computed according to equation (5.1).
We deployed PRISM in a compute cluster which consists of 16 compute nodes. Each compute node has 4-core 2.13GHz Intel Xeon E5606 processors, 12G RAM, 1TB of local high speed hard drive, and runs 64-bit Ubuntu 11.10 OS. The network interface card (NIC) installed on each node is capable of handling up to 1Gb/s of network traffic. Each node is connected to a top-of-rack switch and can communicate with others via a 1Gb/s link.
We have chosen two benchmarks to evaluate the performance of PRISM: Gridmix 2 and PUMA. Gridmix 2 [3] a standard benchmark included in the Hadoop distribution.
For Gridmix 2 we have chosen 3 jobs for performance evaluation: MonsterQuery (MQ), WebDataScan (WDS) and Combiner (CM). Similarly, PUMA [4] is a MapReduce bench-mark developed at Purdue University. We have selected 4 jobs for performance evalua-tion: sort (SRT), self-join (SJ), inverted-index (II) and classification (CL).
We chose these jobs because they contain a variety of resource usage characteristics. For ex-ample, sort and MonsterQuery are I/O intensive jobs, whereas Combiner and self-join are more CPU intensive. A mixture of jobs with different resource requirements allows us to better evaluate the performance of PRISM.
1As each task can provide at most one candidate phase, N tasks in the system imply there can be at
0 20 40 60 80 100
2 4 6 8 10 12 14 16
Map Running Time (sec)
Number of Map Slots
Figure 5.5: Num of Slots vs. Map running time
50 100 150 200 250 300
2 4 6 8 10 12 14 16
Reduce Running Time (sec)
Number of Reduce Slots
Figure 5.6: Num of Slots vs. Job running time
Job: sorter
Input size: 5GB, Map Count: 40, Reduce Count: 56 Map stage completion: 63s
Reduce stage completion: 147s
Phase Map Merge Shuffle Sort Reduce
ti (s) 7.43 1.25 9.07 0.64 9.69
CP U (%) 17.42 14.35 21.58 7.5 8.21
Mem (%) 1.35 1.40 2.11 2.37 2.33
LFS(MB/s) 3.98 34.31 5.71 11.17 5.29
HDFS(MB/s) 7.17 0 0 0 5.30
Shuffle(MB/s) 0 0 7.16 0 0
Figure 5.7: A Job Profile for Sort Job
In order to evaluate the benefit brought by phase-level scheduling, it is necessary to com-pare PRISM to existing task-level resource-aware schedulers. In our experiments, we have chosen Hadoop Yarn 2.0.4 as a competitive task-level resource-aware scheduler. Hadoop Yarn 2.0.4 is a recent version of Hadoop NextGen that allows the users to specify both CPU requirement (i.e. number of virtual cores) and memory requirement (i.e. GB of RAM) of each task. Like the previous version of Hadoop MapReduce, The current Hadoop Yarn
most N candidate phases.
supports both capacity scheduler and fair scheduler. However, only the capacity scheduler supports resource-aware scheduling using both CPU and memory resource requirements.
As a state-of-the-art resource-aware scheduler, in our experiments we compare PRISM with Hadoop Yarn using capacity scheduler. As the capacity scheduler is not fair scheduler. Fi-nally, we use Hadoop 0.20.2 with fair scheduler as a (slot-based) baseline for comparing scheduler performance. This also allows us to evaluate the fairness of PRISM because both PRISM and the fair scheduler aim at achieving running-time fairness. In our experiments, we set α = 1 to given equal importance to both performance and fairness in PRISM.