5.3 Single-Job Task Scheduling
5.3.2 Task Splitting with Prior Knowledge
Now we assume that prior knowledge about task execution time is known. By prior knowledge, we mean thatEstimate Remaining Execution Time(ERET) is known or predictable. ERET indicates how long a task will run before completion approximately. In [135], a simple algorithm is proposed to estimate finish time of MapReduce tasks. How to calculate ERET is out of our scope. We proposeAggressive Split with Prior Knowledge(ASPK) to optimize job turnaround time.
5.3.2.1 When to trigger task splitting:
The same algorithm from last section can be reused here.
5.3.2.2 Which tasks should be split and how many new tasks to spawn:
The ways to split tasks are not unique. The number of task splitting done during the whole lifetime of a job should be as small as possible without degrading performance. Fig. 5.3 demonstrates different ways to split tasks to achieve the same turnaround time. Graph (a) shows a scenario where there are two running tasks -
T1andT2, one idle slot and no waiting tasks. ERET ofT1andT2istand2trespectively. If overhead and
data locality are negligible, we definitely should split tasks to fill the idle slot. We can split taskT1to spawn
a new task and both will run for periodtbefore completion, which is demonstrated in (b). At timetall tasks complete. Another way shown in (c) is to split taskT2to spawn a new task and both will run for period
t/2. At timet/2, two slots become idle and taskT1is split to spawn two new tasks each of which runs for (2t−t/2)/3 =t/2. In both cases, the final job turnaround time is t. However the number of spawned tasks is different. In (b), one task is spawned while in (c) three tasks are spawned. More task splittings incur higher probability to degrade performance and destabilize the system. In the example, (b) is preferred to (c).
Figure 5.3: Different ways to split tasks (Processing time is the same). Dashed boxes represent newly spawned tasks.
Tasks that complete last determine when a job finishes. For jobs with tasks that have highly varied execution time, the scenario should be avoided that few long tasks last much long after other short jobs complete. When long running tasks exist, to split tasks with small ERET generates smaller tasks, which doesn’t affect job turnaround time. So our heuristic is that tasks with large ERET should be split first so that they do not become “stragglers”.
Firstly, the tasks with small ERET are filtered because to split a task that will complete very soon does not yield much benefit. In addition, task filtering is an optimization step that reduces the number of map tasks considered by following steps for faster processing. Secondly remaining tasks are sorted by ERET in descending order. After that, tasks are clustered into{C1, C2, . . . , Cm}according to ERET so that tasks with
similar ERET belong to the same cluster. Each clusterChas several pieces of information including task list (C.T S), the number of tasks (C.Count), the sum of ERET (C.ERET) and the average of ERET (C.AE). We go through task clusters one by one to evaluate whether task splitting is beneficial. Initially, we only consider the tasks in clusterC1. The tasks inC1are split to fill all idle slots, and the estimated task execution timeT1after splitting is calculated. IfT1is larger thanC2.AE, it doesn’t benefit to split the tasks contained in following clusters and the estimated execution time of newly spawned tasks is set toC2.AE. IfT1 is significantly smaller thanC2.AE, the spawned tasks are too small compared with the tasks inC2. So we
consider the tasks from bothC1andC2for splitting. TimeT2is calculated and compared withC3.AE. IfT2
is much smaller, we considerC1,C2, andC3. This process is repeated untilTiis larger than or comparable
toCi+1.AEor all clusters have been included. The algorithm skeleton is shown in Fig. 5.4.
1 IMS ← number of idle map slots
2 UTS ← unfinished tasks
3 FTS ← filterTasks (UTS)
4 STS ← sortByERET (FTS)
5 {C1,C2,...,Cm} ← clusterTasks (STS)
6 sumERET ← 0, count ← IMS
7 for cluster Ci 1≤ i≤ m:
8 sumERET += Ci.ERET
9 count += Ci.Count
10 avgERET = sumERET / count
11 if i = m: break
12 if avgERET << Ci+1.AE:
13 continue
14 else
15 break
FilteringIdeally, how tasks are filtered should depend on the ERET of unfinished tasks. A pre-set thresh- old is not flexible enough to capture task characteristics. Instead, we calculate theoptimal remaining job execution time(ORJET) by assuming that all unfinished tasks are split to use all available slots. Total ERET is gained by adding up ERET of all unfinished tasks. It is divided by the total number of map slots (including both occupied and idle slots) to get ORJET. ORJET measures how long a job will run before completion op- timally. Then ERET of each task is compared with ORJET. If task ERET is significant smaller than ORJET, the task is filtered out. Towards the end of job execution, ORJET becomes increasingly small because run- ning tasks are close to completion and more slots are released. In this situation, task splitting is not beneficial because the overhead of task splitting outweighs the potential gain of higher concurrency. So we filter out tasks that are close to completion without affecting overall performance. Thus the filtering process is adaptive to workloads of different types.
Clustering Task clustering algorithm is designed to group tasks with similar ERET and separate tasks with significantly different ERET. Existing clustering algorithms, such as K-means, Expectation- Maximiza- tion and agglomerative hierarchical clustering, from the machine learning community can be used without modification. Considering that scheduling routine is called frequently and its performance is critical to the whole system, we favor simple linear algorithms. The tasks being clustered have been ordered by ERET, which guarantees that tasks belonging to the same cluster are consecutive in the task list. Our current algo- rithm requires that the task list is scanned once by moving a “cursor” from beginning to end. A running list is maintained to contain tasks that are before the “cursor” and belong to the current cluster. If ERET of the task pointed by the cursor is much smaller than the average ERET of the current cluster, the current cluster is added to the global cluster set and a new cluster is created which initially only contains the task pointed by cursor. This guarantees the maximal ERET of tasks within a cluster is significantly smaller than the average ERET of tasks within previous cluster.
5.3.2.3 How to split:
The way to split tasks can be optimized if we also have prior knowledge about mean task execution time, net- work throughput, disk I/O throughput, etc. For task T, disk I/O cost, network I/O cost, and computation cost are denoted byDIO(T),N IO(T), andCom(T)respectively. So the total time isDIO(T) +N IO(T) +
Com(T), if these operations don’t overlap. The task being split is denoted byTcur, and the newly spawned
tasks areT1
cur,Tcur2 ,. . .,TcurN . Ideally, equation (5.6) should be satisfied to make tasks complete simultane-
ously after splitting.
DIO(Tcur1 ) +N IO(Tcur1 ) +Com(Tcur1 ) (5.6) =· · · ·
=DIO(TcurN ) +N IO(TcurN ) +Com(TcurN )
=DIO(Tcur) +N IO(Tcur) +Com(Tcur)
Because we assumeDIO(T)andN IO(T)are negligible compared toCom(T), the above equation is converted to (5.7).
Com(Tcur1 ) =Com(Tcur2 ) =· · ·=Com(TcurN ) =Com(Tcur) (5.7)
So the unfinished work of taskT is evenly distributed to itself and the newly spawned tasks after splitting.
5.3.2.4 Complexity:
In ASPK, the complexity of sorting isO(n·logn)and that of other operations is not greater thanO(n). So the overall complexity isO(n·logn). However, sorting can be further optimized considering that in each iteration, except the first one, tasks are mostly ordered.