• No results found

Adaptive Task Scheduling for Multi Job MapReduce

N/A
N/A
Protected

Academic year: 2021

Share "Adaptive Task Scheduling for Multi Job MapReduce"

Copied!
8
0
0

Loading.... (view fulltext now)

Full text

(1)

Adaptive Task Scheduling for MultiJob MapReduce

Environments

Jord`a Polo, David de Nadal, David Carrera, Yolanda Becerra, Vicenc¸ Beltran, Jordi Torres and Eduard Ayguad´e

Barcelona Supercomputing Center (BSC)

Technical University of Catalonia (UPC) Barcelona, Spain

{jordap, ddenadal, dcarrera, yolandab, vbeltran, torres, eduard}@ac.upc.edu

Abstract—MapReduce is a data-driven programming model originally proposed by Google back in 2004, specially well suited for running extremely distributed applications in very large data centers. Current trends in data center management comprise is the extensive use of virtualization and workload consolidation strategies to save energy and cut costs. In such an environment, different applications belonging to different users share the same physical resources. Therefore, predicting and managing the performance of each workload is a crucial task from the point of view of data center management. In this paper we address the problem of predicting and managing the performance of MapReduce applications according to a set of performance goals defined for each application. In our prototype system we introduce a new task scheduler for a MapReduce framework that allows the performance-driven management of MapReduce tasks. The result is a task scheduler that dynamically predicts the performance of several concurrent MapReduce jobs and that adjusts at runtime the resources that each job gets allocated, in order to meet its performance goal without wasting physical resources.

Index Terms—MapReduce, Performance management, Task scheduling

I. INTRODUCTION

MapReduce is a framework originally designed by Google to exploit large clusters to compute parallel problems. It is based on an implicit parallel programming model that provides an easy and convenient way to express certain kinds of distributed/parallel problems. This framework is composed of an execution runtime and a distributed file system that take care of all the details involved in the distribution of tasks and data across nodes. The runtime and the distributed file system also handle all the issues related to fault tolerance and reliability, which are crucial in such a dynamic environment.

Each MapReduce application is run as a job that is sub-mitted to the MapReduce runtime. Each job is split into a large number of Map and Reduce tasks before being started. The runtime is in charge of running tasks for every job until they are completed. The tasks are actually executed in any of the slave nodes that the MapReduce cluster comprises. In particular, the task scheduler is responsible for deciding what tasks are run at each moment in time, as well as what slave node will host the task execution. Therefore, in a multi-job environment, the task scheduler is the major responsible for the performance delivered by each MapReduce application.

However, while at first MapReduce was primarily used for

batch data processing, it is now also being used in shared, multi-user environments in which submitted jobs may have completely different priorities: from small, almost interactive executions, to very long programs that can take hours to complete. In these new scenarios, task scheduling is becoming even more relevant as it is responsible for deciding what tasks are run, and thus the performance delivered by each application.

In this paper we present an application-centric task sched-uler for Hadoop (Apache’s open source implementation of a MapReduce framework). The proposed scheduler is able to dynamically manage the performance of the in-execution MapReduce applications by dynamically adapting the re-sources each application gets allocated. In particular, the scheduler is driven by high-level performance goals (such as job deadlines), and leverages estimates for each job completion time to decide on runtime the best allocation of task execution slots across jobs. In our experiments, we evaluate different job completion time estimation techniques, and we pick the best performing one to run illustrate execution examples in which the effectiveness of our performance-driven dynamic scheduling for MapReduce applications is tested.

The rest of the paper is structured as follows. In sec-tion II we introduce some technical background necessary to understand MapReduce applications as well as Hadoop, the MapReduce open source implementation we have used in our experiments. Section III depicts the different techniques for estimation of job completion time that we have evaluated, as well as the core of the performance management scheduler we propose. Section IV presents the basic architecture of the scheduler architecture and its integration into Hadoop. In Section V we present the experiments that support the contribution of our work. And finally, section VII shows the related work and section VIII the conclusions of our work.

II. TECHNICAL BACKGROUND

A. MapReduce

MapReduce [6] is a programming model proposed by Google to facilitate the implementation of massively parallel applications that process large data sets. This programming model hides to the programmer all the complexity involved in management of parallelization. The programmer only has to implement the computational function that will execute on

(2)

each node (the map() function) and the function to combine the intermediate results to generate the final result of the application (the reduce() function). Each map input is a key-value pair that identifies a piece of work. The type of both value and key can be defined by the programmer. The output of each map is an intermediate result also expressed as a pair key-value, which can be of a different type than the input pair, and that are also defined by the programmer. The input of the reduce function is composed of all the intermediate values identified by the same key, this way the reduce function can combine all of them to form the final result.

In the MapReduce model all nodes in the cluster execute the same function but on a different chunk of input data. The MapReduce runtime is responsible for distributing and for balancing all the work across the nodes, dividing the input data into chunks, assigning a new chunk every time a node becomes idle and collecting the partial results.

There are many runtime implementations to support this model, depending on the type of environment they aim to sup-port. In this paper we present a prototype that is implemented on top of Hadoop, a MapReduce framework presented in the following section.

For example, in the case of Google implementation [6] and in the case of Hadoop [1], they target the execution on a distributed execution environment that uses a distributed file system to support the data sharing (the GFS [9] in the case of Google, and the HDFS [3] in the case of Hadoop).

B. Hadoop

Hadoop is an open source MapReduce runtime implemen-tation provided by the Apache Software Foundation. The mechanism it uses to implement shared memory is based on a distributed file system also provided by the Hadoop project: Hadoop Distributed File System [3] (HDFS). Thus, data sharing is implemented through HDFS files. The archi-tecture of this File System is a master/slave archiarchi-tecture. The master process (NameNode) manages the global name space and controls the operations on files, and each slave process (DataNode) implements the operations on those blocks stored in its local disk, following the NameNode indications.

The runtime has two different processes involved. The pro-cess which distributes work among nodes is named JobTracker. This process uses the method configured by the programmer to partition the input data into splits and thus, to populate a local job-queue. If a node in the system becomes idle, the JobTracker picks a new job from its queue to feed it. Thus, the granularity of the splits have a high influence on the balancing capability of the scheduler. Another consideration of the map tasks scheduling is the location of the blocks, as it tries to minimize the number of remote blocks accesses.

The process that controls the execution of the map tasks inside a node is named TaskTracker. This process receives a split description, divides the split data into records (through the RecordReader component) and launches the processes that will execute the map tasks (Mappers). The programmer can also decide how many simultaneous map() functions wants to

execute on a node, that is, how many Mappers the TaskTracker has to create. When a TaskTracker ends the processing of one split and is ready to receive a new split, it sends a notification to the JobTracker.

III. PERFORMANCEMANAGEMENT

In this section we present the techniques we have studied to predict the behaviour of jobs and its completion time, and then describe how this information can be used by schedulers to provide performance-driven scheduling of multiple jobs. A. Job performance estimation

In order to estimate the finish time of a given job, it is important to collect accurate data about tasks that have already finished, as well as about tasks that are currently running.

Hadoop already provides a way for tasks to report its progress, but as you will see in section IV, it doesn’t always provide good enough data. Since our goal is to schedule all kinds of jobs, we have looked into alternative mechanisms that could be used to estimate the job’s progress from the JobTracker. See below for an overview of the estimators we have studied so far.

Completed tasks (Ec)

This is the first and most elementary estimator: it only takes into account the number of completed tasks since the job started (N ), and the task com-pletion rate is then used to estimate the time it will take to finish the remaining tasks.

Ec=

Elapsed job time

N × Remaining tasks Input size (Es)

This estimator is based on the input size of all the maps that have been completed, which is then compared to the total input size to get the input progress.

Es=

Elapsed job time

PN i=1Inputi

Total input

Partial time (Ep)

Unlike the previous estimators, this one collects measures not only from finished tasks, but also from currently running tasks (thus partial). It calculates the average task time based on the time it has taken to finish already completed tasks, and also for how long current tasks (N + 1 to M ) have been running.

Average = PN

i=1Finish timei− Start timei

N P artial =

M

X

i=N +1

Current time − Start timei

Ep=

Elapsed job time × Desired tasks N +AverageP artial

B. Performance-driven co-scheduling

To prove that scheduling decisions could benefit from progress estimations, we first tried to design a very simple scheduler. The idea behind this equalizing scheduler is to

(3)

complete all running jobs at the same time, assigning free task slots to the job with the more distant estimated completion time. Since estimators become more accurate as more tasks are completed, we expect this scheduler to be fairly close to the desired results.

In addition to this basic scheduler, we also worked on a more advanced QoS-oriented scheduler, the goal of which is to execute running jobs using as few resources as possible while trying to meet the user-provided deadline for each job. The deadline scheduler works by assigning available task slots to the job with the largest positive need of CPU, which roughly translates as the job that needs more resources to be completed on time. The need is defined as the difference between the estimated number of slots required to meet the deadline and the number of running tasks:

Slots = Remaining time until deadlineRemaining tasks

Average task time

Need = Slots − Running tasks

Note, however, that right after submitting a job, there is no data available and it isn’t possible to estimate the required slots nor the completion time until some tasks are actually run. Since we want to have an idea of the job’s needs as soon as possible, jobs with no completed and no running tasks always take precedence over other jobs. If there is more than one such job, the oldest one comes first.

And finally, there is another scenario that needs to be taken care of: jobs that are past its deadline. If it isn’t possible to meet the job’s deadline, the scheduler tries to at least complete it as soon as possible, prioritizing it over any other kind of job, which in turn also helps to avoid job starvation.

IV. SYSTEMPROTOTYPE

Analyzing the job’s progress to estimate future behaviour is the basis for the schedulers described in this paper. Early in the first versions, however, we found scenarios that weren’t as promising as we had thought. As discussed in section III-A, Hadoop provides a way for tasks to report its progress to the JobTracker. Although Map tasks can use the RecordReader’s getProgress()method, the scheduler can not rely on the use of this method since it is not always correctly implemented (or implemented at all, for that matter). Since our goal is to schedule any kind of job, we decided to implement alternative mechanisms that could be used to estimate the job’s progress externally, that is, from the JobTracker or scheduler, without relying on the progress as reported by each task.

It should also be noted that the current implementation of the schedulers proposed in the previous section has been restricted to map tasks only. Estimation of maps is arguably easier than that of reduces: there is usually a large, known number of maps, while the number of reduces is smaller and its execution is more variable. As part of our future work, however, we intend to extend the initial implementation to support reduce tasks.

The deadline scheduler keeps information about each job, updating it regularly, and using it to queue all jobs accordingly,

prioritizing jobs with the highest need of CPU. The status of each job is one of the keys of the scheduler since it helps to select the right job. All running jobs are classified into one of the following estates:

NODATA There is not enough data to estimate the job’s requirements.

ADJUST The scheduler is able to decide whether the job needs more or less slots.

UNDEAD The job is past its deadline.

When jobs are submitted, they are marked as NODATA, and can move to ADJUST as soon as the first task is completed, but obviously not the other way around. If a job demands more resources than currently available, it will end up becoming an UNDEAD job. Eventually, when all tasks are completed, the job is removed from the list of running jobs. In order to avoid starvation, UNDEAD jobs are always run first, followed by NODATA jobs with no running tasks, and finally ADJUST jobs.

V. EXPERIMENTS

In this Section we present four different experiments carried on to explore the effectiveness and goodness of the different job completion estimation techniques, as well as to evaluate the task scheduling techniques we have implemented in the core of the Hadoop JobTracker.

To evaluate the chosen job estimation technique we use two different MapReduce applications. The first one presents a regular task length distribution. The second application shows an irregular task length distribution. In particular, we used the sample WordCount application distributed with Hadoop as a case of job with regular task completion time, and we used a distributed simulator as a case of job with irregular task completion time.

A. Experimental environment

In our experiments, we used 5 2-way Intel Xeon nodes with 2GB of RAM as our experimental Hadoop cluster. One of the nodes was configured to be the JobTracker, another one as the NameNode, and the three remaining nodes were used as DataNodes and TaskTrackers. All the nodes were connected using a Gigabit ethernet, and were running 32-bit Debian GNU/Linux 4.0 with kernel version 2.6. We used OpenJDK 1.6 to run the MapReduce applications we used Hadoop 0.21-dev.

The simulator used in the experiments is that presented in [5], that was extended to run a number of system configu-rations. Each simulator instance is run as a Map task, and we leverage MapReduce to coordinate the distributed execution of parallel simulations.

B. Goodness of job completion estimation techniques In this first experiment we consider the three different techniques, presented in Section III-A, for estimating job completion time. Results are presented in Figure 1. As it can observed, Completed Tasks and the Input Size estimators offer almost the same behavior. Both of them show a high

(4)

-100 -50 0 50 100 150 200 250 300 350 400 100 200 300 400 500 600 700 800 900 2 14 30 46 62 74 Absolute error (s) Elapsed time (s) Number of finished maps

Completed tasks Input size Partial time

Fig. 1. Completion time for each estimator

0 20 40 60 80 100 120 100 200 300 400 500 600 700 800 900 T ime per map (s) Elapsed time (s) TaskTracker 1 TaskTracker 2 TaskTracker 3

Fig. 2. Task length distribution: WordCount

variabiliy, and it is only when the job gets close to its actual completion time that these methods show a low absolute error in the estimation. The Partial time method offers more stable results over time, with a relative error that is less than 5% for this experiment, and presents a clear trend to be optimistic in the estimation. We are currently working on improving the method to increase the accuracy of this method. Notice that Partial timeis the approach that exhibits a better behavior and therefore this is the method that has been used for the rest of the experiments presented in this Section.

-100 -50 0 50 100 200 300 400 500 600 700 800 900 2 14 30 46 62 74 Absolute error (s) Elapsed time (s) Number of finished maps

Fig. 3. Partial time estimator: WordCount

C. Estimating job completion time

In a second experiment we evaluate the accuracy of the job completion time prediction technique for a WordCount appli-cation (regular task length distribution) and for the Simulator (irregular task length distribution).

Figures 2 and 3 show the results for the WordCount appli-cation. In particular, Figure 2 shows the task completion time distribution (and which one of the three used TaskTrackers ran each Task) over time (actual time each Mapper needs to be completed). Notice that task completion time is very regular for this application, and the small visible variability can be explained by the heterogeneity of the nodes used in the Hadoop cluster. Figure 3 shows the absolute error of the predicted job completion time with respect to the actual completion time at each moment of the job execution. Negative time values represent that the technique is optimistic, and that the predicted job completion time is before the actual com-pletion time observed when the job finishes. The estimation is continuously updated over time. As it can be observed, for an application with regular task completion time distribution, the estimation is particularly stable, with some variations in the initial phases of the experiment while the predictor is calibrated, and becomes stable as more tasks are completed. Notice that all over the experiment the relative error of the prediction is below 5% of the actual job completion time, that is about 900s.

Figures 4 and 5 show the results for the Simulator applica-tion. In particular, Figure 4 shows the task length distribution (and which one of the three used TaskTrackers ran each Task) over time. Notice that in this case task completion time is very irregular for this application, and the task time depends on the hardness of the simulator parameters. Figure 5 shows the absolute error of the predictions for the simulator application. As in the previous experiment, negative time values represent that the technique is optimistic. The high variability of the task length introduces in this case a reduction in the accuracy of the predictions. In periods in which the average task length is lower, such as in the periods 0-4,000s, and 7,000-10,000s, the prediction is updated to expect an earlier completion time, and in periods in which the average task length increases, the prediction is updated to expect later completion times. Notice that the accuracy of the prediction increases over time, as the average task completion time becomes more accurate. The prediction assumes that each task takes this average time to complete, and thus, the more population from the tasks to be completed has been observed, the more accurate the prediction becomes. This can be observed in the fact that although the absolute prediction error follows a similar pattern to that observed for the task length distribution over time, the prediction and the actual completion time converge as time gets closer to the actual job completion time.

D. Equalizing completion time for multiple jobs

In a third experiment we explore the capacity of the proposed Hadoop scheduler to equalize the completion time of two jobs. This scenario is taken as a particular case of

(5)

0 100 200 300 400 500 0 5000 10000 15000 20000 25000 T ime per map (s) Elapsed time (s) TaskTracker 1 TaskTracker 2 TaskTracker 3

Fig. 4. Task length distribution: Simulator

-18000 -16000 -14000 -12000 -10000 -8000 -6000 -4000 -2000 0 2000 0 5000 10000 15000 20000 25000 1 848 1454 1573 2222 2965 3159 3643 4388 4732 5032 5855 6310 6398 Absolute error (s) Elapsed time (s) Number of finished maps

Fig. 5. Partial time estimator: Simulator

scheduling policy that can be implemented in the Hadoop scheduler. The experiment considers two instances of the Sim-ulator application that are submitted at different moments in time. Afterwards, the scheduler is responsible for making both instances to complete simultaneously. The scheduler achieves this objective by allocating different number of execution slots to each job based on the estimated completion time over time. In this case the simulator is configured to run less tests than in the previous sections, and thus, the job has a shorter execution time and less variability in the task length.

Figure 6 shows the absolute error of the predictions for the simulator application. As in the previous experiments, negative time values represent that the technique is optimistic. Figure 7 shows the percentage of the job tasks already completed over time. As it can be observed, both instances of the same job are started at different moments in time. The first instance starts at time 0, while the second instance starts short after time 200s. Both job instances are identical, so they have to complete the same number of tasks. Looking at the progress of both jobs over time, it can be observed that when the second job starts, the first job stops making any progress, because the scheduler decides to allocate the second job all the available execution slots. When the estimated completion time for both

-300 -200 -100 0 100 200 300 400 0 100 200 300 400 500 600 Absolute error (s) Elapsed time (s) J2 Job 1 Job 2

Fig. 6. Evolution of the estimated completion time

25 50 75 100 0 100 200 300 400 500 600 Percent completed (%) Elapsed time (s) J2 Job 1Job 2

Fig. 7. Evolution of the job’s percentage of completion

jobs is equalized, the scheduler starts balancing the allocation of execution slots to both jobs so they continue making similar progress until completion time. Notice that this experiment is driven by the progress made by each job over time, but is not driven by any explicit completion time goal.

E. Managing jobs with different deadlines

In the last experiment, two instances of the WordCount application are submitted, and scheduler is provided with an explicit completion time goal for each job. Therefore, the scheduler is responsible for making the goal for both jobs by continuously monitoring the progress and the estimated completion time for each job, and by dynamically adjusting the allocation of execution slots to each job. Notice that in this experiment, the deadlines are set to be achievable with the available resources.

Figure 8 shows the absolute error of the predictions for the simulator application. As in the previous experiments, negative time values represent that the technique is optimistic. Figure 9 shows the percentage of the job tasks already completed over time. As it can be observed, the experiment shows two instances of the WordCount job being submitted to the Hadoop cluster. The first instance (Job1) is submitted at time 0s, while the second instance (Job2) is submitted at time 40s (J2 in the chart). The deadline for the first job is set to be 240s after its

(6)

-200 -150 -100 -50 0 50 0 50 100 150 200 250 Absolute error (s) Elapsed time (s) J2 D2 D1 Job 1 – deadline: 240 Job 2 – deadline: 120

Fig. 8. Evolution of the estimated completion time

submission (D1), that is, at time 240s. The deadline for the second instance is set to be 120s after its submission (D2), that is at time 160s. Notice that the minimum completion time for one instance of this job, running alone in the Hadoop cluster, and having all the available resources allocated, is about 100s. Therefore, we chose a tight deadline factor for Job1, which is 1.2x times its minimum achievable completion time, and for Job2 it is 2.4x, so it is more relaxed. Notice also that there is a delay of around 30 seconds between the job submission time at the point at wich tasks start being executed. This time is explained by the cost of splitting the input data and the distribution of keys and values through HDFS.

When Job1 is submitted, at time 0, it is automatically put to run. At time J2, the second job is submitted. Although the scheduler needs to start running this second job immediately to start collecting progress data that will allow the estimation of the expected completion time, it is not able to do so because it must wait for at least one task from Job1 to be freed. When Job2 starts running, the scheduler determines that it needs to allocate more resources to this job, and thus, Job1 is slowdown in favour of Job1. This can be observed in Figure 9. The scheduler keeps adjusting the resource allocation of both jobs until their completion, with more precision as the accuracy of its estimations improves.

Finally, Figure 10 shows the allocation of task execution slots for each of the jobs. In this particular experiment we used 2 TaskTrackers, configured to host a maximum of 2 Tasks each. Thus, four is the maximum number of concurrent Tasks that can be executed. Notice how Job2 gets more resources than Job1 since its submission, what is explained by its tight deadline. The scheduler is configured to minimize the resources allocated to each job, using the minimum required to meet, when possible, the deadline for each job. It can be observed that once Job2 is completed, Job1 still has some margin to meet its goal, so the scheduler decides to allocate only 2 of the available slots to this job (with punctually an allocation of only 1 slot). The other two slots remain available to run other Tasks or even to introduce an opportunity to apply energy saving techniques.

0 20 40 60 80 100 0 50 100 150 200 250 Percent completed (%) Elapsed time (s) J2 D2 D1 Job 1 Job 2

Fig. 9. Evolution of the job’s percentage of completion

0 1 2 3 4 0 50 100 150 200 250 Number of running tasks Elapsed time (s) 0 1 2 3 4 0 50 100 150 200 250 Number of running tasks Elapsed time (s) Job 1 Job 2

Fig. 10. Distribution of running tasks

VI. PERFORMANCESTUDY

In this section we present two additional experiments related to the performance of MapReduce applications. In our first experiment, we evaluate the impact of data locality, while in the second one we discuss some issues related to memory usage in Hadoop’s MapReduce.

Since our goal was to understand Hadoop’s implementation, and specially how the initial steps performed and what was its overhead, this time we used a custom application specifically designed to do the bare minimum: only read its input. This job included its own input format to divide the input into as many splits as available TaskTrackers, with only one concurrent map task per node. Note that in the following sections we use next() to refer to the last method that actually reads the files from HDFS, and that was configured to always read 256 MB at a time. The map tasks stop right after reading, so there is no output, and consequently no reduce tasks either. A. Experimental environment

In these experiments we used a similar environment than in the previous ones. JobTracker and NameNode were run on the same Intel Xeon nodes with 2 GB of RAM, while the 2 slaves used as DataNodes and TaskTrackers were Intel Xeon nodes with 16 GB of RAM running 64-bit Debian GNU/Linux.

(7)

B. Remote data overhead

In order to evaluate the impact of data locality in Hadoop’s performance during the map phase, we ran the application switching the two available slaves to act as DataNodes or TaskTrackers. First, input was copied to only one DataNode, and then read from the same node’s TaskTracker; after that, repeat using the second TaskTracker in order to read the data remotely.

The input consisted of 8 files of exactly 64 MB, the same as HDFS’s block size (dfs.block.size), so there was no overhead to join split files. Also, since there was only a single TaskTracker with one map slot reading 256 MB at a time, the task had to run 2 sequential next() operations to complete the total 512 MB. The results below show the averages after repeating each execution 5 times.

TABLE I

AVERAGE N E X T()METHOD TIME(MS) DataNode Slave 1 Slave 2 TaskTracker Slave 1Slave 2 1474.42969.0 2876.81875.7

TABLE II TOTAL ELAPSED TIME(S)

DataNode Slave 1 Slave 2 TaskTracker Slave 1 17.976 20.976 Slave 2 20.906 17.908

Table I shows the time needed to execute each next() method under the different scenarios. As expected, it is sig-nificantly faster to read locally: approximately 1.5-2 seconds instead of 3 seconds. But, while reading from remote HDFS nodes is slower than local reads, they aren’t too way behind, and in this case the differences can be exclusively attributed to the data transfer over the network.

Table II shows the elapsed time to execute the entire job. Again, there is a difference between executions that use the same host as TaskTracker and DataNode, and those that use different hosts. However, taking into account that each job calls the next() method twice, it can be observed that the 3 second difference is in fact solely caused by the differences in next() times.

C. Memory constraints

During some of the experiments, we experienced a degra-dation in Hadoop’s performance when using large inputs, and found out later that it was caused by a hardware issue that reduced the amount of memory in one of the slaves to 3 GB only. It is interesting, however, to see how failures affect the whole process.

In this experiment we ran a job repeatedly, gradually in-creasing its input size: first 8 files of 64 MB (512 MB), then 16 (1 GB), etc. up to 88 files (5.5 GB). Again, the program basically did nothing other than reading input files. Also, note

0 2000 4000 6000 8000 10000 12000 8 16 24 32 40 48 56 64 72 80 88 T ime (ms)

Number of 64MB input files slave 1

slave 2

Fig. 11. Elapsed time to execute each next() call – before detecting hardware “failure” 0 2000 4000 6000 8000 10000 12000 8 16 24 32 40 48 56 64 72 80 88 T ime (ms)

Number of 64MB input files slave 1

slave 2

Fig. 12. Elapsed time to execute each next() call – after fixing hardware “failure”

that the job was equally divided into 2 tasks independently of its input size, but within the task each step (or next operation) always read the same amount of data: 256 MB.

Taking the initial input of 512 MB, for instance, it is split into two tasks, each of which will read 256 MB only once. On the other hand, an input of 5.5 GB is also split into two tasks, but this time each task will proceed to read 256 MB eleven times.

Figure 11 shows the time it took to execute each next() call before detecting the hardware failure. One of the slaves was slower all along, but from 56 files (3.5 GB) onward, the time it took to do the same amount of work increased in both slaves. Even though only one of the machines, slave 1, failed, it also affected the second slave. This isn’t that surprising considering that both were working also as DataNodes: not only it was slower to run the task, but also to serve the data to the remote TaskTracker. After fixing the failure, the times were much more stable, as shown in figure 12.

(8)

VII. RELATEDWORK

Process scheduling is a deep explored topic for parallel applications, considering different type of applications, differ-ent scheduling goals and differdiffer-ent platforms architecture ([8], [7], [10]). However, there is few related work on scheduling for MapReduce applications. The initial scheduler presented in the Hadoop distribution uses a very simple FIFO policy, considering five different application priorities. In addition, in order to isolate the performance of different jobs, the Hadoop project is working on a system for provisioning dedicated Hadoop clusters to applications [2]. However this approach can result in resource underutilization. In [11] the authors propose a fair scheduling implementation to manage data-intensive and interactive MapReduce applications executed on very large clusters. The main concern of this policy scheduling is to give equal shares to each user. In addition, as for this kind of applications exploiting data locality is a must, it tries to execute each task near the data they use. However, this approach is not appropiated for long-running applications with different performance goals. In addition, they do not dinamically adapt the scheduling decissions to changes on the progress of the applications.

VIII. CONCLUSIONS

In this paper we have presented a prototype of a task sched-uler for MapReduce applications. It has been implemented on top of Hadoop, the Apache’s open-source implementation of a MapReduce framework. The proposed scheduler is able to calculate the estimated completion time for each MapRe-duce job in the system. The estimator takes advantage of the fact that each MapReduce job is composed of a large number of tasks (mappers and reducers) to be completed, that this number of tasks is calculated in advance during the job initialization phase (when the input data is split) and that the progress of the job can be observed on runtime. The proposed scheduler takes each submitted and not yet completed Hadoop job and monitors the average task length for already completed tasks. This information is later used to predict the job completion time. Based on this estimations, the scheduler is able to dynamically adapt the number of task slots that each job is allocated. Therefore it introduces runtime performance management in the Hadoop MapReduce framework. This work is a first step toward the creation of an efficient MapReduce scheduler for Hadoop that will be able to manage the performance of the MapReduce applications based on high level policies. We plan to consider several high-level objectives, such as energy saving, workload collocation and data placement, in the management of the resources of the MapReduce cluster. We are also working on the integration of this scheduler in the EmotiveCloud [4] middleware.

ACKNOWLEDGEMENTS

This work is partially supported by the Ministry of Science and Technology of Spain and the European Union (FEDER funds) under contract TIN2007-60625, and by IBM through the 2008 IBM Faculty Award program.

REFERENCES

[1] Hadoop Map/Reduce Tutorial

http://hadoop.apache.org/core/docs/current/mapred tutorial.html. [2] Hadoop On Demand

http://hadoop.apache.org/core/docs/r0.20.0/hod user guide.html. [3] HDFS Architecture

http://hadoop.apache.org/core/docs/current/hdfs design.html.

[4] Autonomic Systems and eBusiness Platforms research line. Barcelona Supercomputing Center (BSC). Emotive Cloud.

http://www.emotivecloud.net.

[5] D. Carrera, M. Steinder, I. Whalley, J. Torres, and E. Ayguad´e. Enabling resource sharing between transactional and batch workloads using dy-namic application placement. In Middleware ’08: Proceedings of the 9th ACM/IFIP/USENIX International Conference on Middleware, pages 203–222, New York, NY, USA, 2008. Springer-Verlag New York, Inc. [6] J. Dean and S. Ghemawat. MapReduce: Simplified data processing

on large clusters. In OSDI ’04: Sixth Symposium on Operating System Design and Implementation, pages 137–150, San Francisco, CA, December 2004.

[7] D. G. Feitelson. Job scheduling in multiprogrammed parallel systems. Technical Report RC 19790 (87657), August 1997.

[8] D. G. Feitelson and L. Rudolph. Parallel job scheduling: Issues and approaches. In JSSPP, pages 1–18, 1995.

[9] S. Ghemawat, H. Gobioff, and S.-T. Leung. The google file system. SIGOPS Oper. Syst. Rev., 37(5):29–43, 2003.

[10] C. E. Volker, V. Hamscher, and R. Yahyapour. Economic scheduling in grid computing. In Scheduling Strategies for Parallel Processing, pages 128–152. Springer, 2002.

[11] M. Zaharia, D. Borthakur, J. Sen Sarma, K. Elmeleegy, S. Shenker, and I. Stoica. Job scheduling for multi-user mapreduce clusters. Technical Report UCB/EECS-2009-55, EECS Department, University of California, Berkeley, Apr 2009.

References

Related documents

To analyze only endogenous subclinical hypothyroidism, we excluded 253 participants in the Cardiovascular Health Study, 207 in the Health ABC Study, 43 in the Osteoporotic Fractures

Similarly these normalized rank-1 CP matrices together with the normalized extremely bad matrices constitute the extreme points of ( 23 ).. We prove

However, as regards the safety and quality of food produced abroad and sold on the Slovak market, in these answers, Figure 2 also shows the dissatisfaction of respondents with

The problems with the current model of land mobility in Lanzarote stem from the accumulation and interrelation of many conflicts [ 49 ], among which are: strong growth in

passing mandatory labeling laws, subject to certain conditions. 50 GMO- labeling initiatives have been proposed in most states, but the overwhelming majority of them never

Since individuals and groups partly choose markedly di¤erent quantities as …rst movers, di¤erences we observe in individual and group second-mover decisions might be driven by

Next, after reviewing the agreed-on security definitions (Step 1), security and safety goals (Step 2), system architecture (Step 2), typical network topology (Step 2), and use