• No results found

Assessing MapReduce for Internet Computing: A Comparison of Hadoop and BitDew-MapReduce

N/A
N/A
Protected

Academic year: 2021

Share "Assessing MapReduce for Internet Computing: A Comparison of Hadoop and BitDew-MapReduce"

Copied!
9
0
0

Loading.... (view fulltext now)

Full text

(1)

Assessing MapReduce for Internet Computing: A Comparison of Hadoop and

BitDew-MapReduce

Lu Lu, Hai Jin, Xuanhua Shi

Cluster and Grid Computing Lab Services Computing Technology and System Lab Huazhong University of Science and Technology

Wuhan, 430074, China {llu, hjin, xhshi}@hust.edu.cn

Gilles Fedak

INRIA/Universitý of Lyon

Lyon, 69364, France gilles.fedak@inria.fr

Abstract——MapReduce is emerging as an important programming model for data-intensive application. Adapting this model to desktop grid would allow taking advantage of the vast amount of computing power and distributed storage to execute new range of application able to process enormous amount of data. In 2010, we have presented the first implementation of MapReduce dedicated to Internet Desktop Grid based on the BitDew middleware. In this paper, we present new optimizations to BitDew-MapReduce (BitDew-MR): aggressive task backup, intermediate result backup, task re-execution mitigation and network failure hiding. We propose a new experimental framework which emulates key fundamental aspects of Internet Desktop Grid. Using the framework, we compare BitDew-MR and the open-source Hadoop middleware on Grid5000. Our experimental results show that 1) BitDew-MR successfully passes all the stress-tests of the framework while Hadoop is unable to work in typical wide-area network topology which includes PC hidden behind firewall and NAT; 2) BitDew-MR outperforms Hadoop performances on several aspects: scalability, fairness, resilience to node failures, and network disconnections.

Keywords-desktop grid computing, MapReduce, data-intensive application, cloud computing

I. INTRODUCTION

Researchers in various fields have the willingness to use large numbers of computing resources to attack their problems of enormous scale. Desktop grid has shown its capability to address this problem by using computing, network and storage resources of idle PCs distributed over multiple LANs or the Internet, especially for CPU-intensive applications. We believe that applications could benefit not only from the vast CPU processing power but also from the huge data storage potential offered by desktop grid [3]. Distributed data processing has been widely used and studied, especially after Google shows the feasibility and simplicity of MapReduce for handling massive amount of web search data on their internal commodity clusters [11]. Recently, Hadoop [1] has emerged as the industrial standard of parallel data processing on enterprise data centers. Many projects are exploring ways to support MapReduce on different types of environment (e.g. Mars [16] for GPU,

Phoenix [26] for large SMPs), and for wider range of applications [9].

Implementing the MapReduce programing model for desktop grid raises many challenges due to the low reliability of these infrastructures. In 2010, we have proposed [5] the first implementation of MapReduce for desktop grid based on the BitDew middleware [14]. Typical system, such as BOINC [2] or XtremWeb [7] are oriented towards Bag-of-Tasks application and are built following simple master/slave architecture, where the workers pull tasks from a central server when they are idle. The architecture we propose radically differs from traditional desktop grid system in many aspects. Following a data-centric approach files and tasks are scheduled now independently by two different schedulers. Communication patterns are now more complex than the regular workers to master one because collective communications happen at several steps of the computation: initial distribution of file chunks, shuffle and the final reduction. Because of the tremendous amount of data to process, some components such as the results checker are now decentralized. As a result, a strict methodology is needed to assess the viability of this complex architecture when running on realistic Internet conditions.

We summarize the main contributions of this paper as follows:

• We implement a novel sophistical MapReduce scheduling strategy for dynamic and volatile environment, which tolerates data transfer and communication faults, avoids unnecessary tasks re-execution and aggressively backup slow tasks. • We propose a new experimental framework which

emulates key fundamental aspects of Internet Desktop Grid (faults, hosts churn, firewall, heterogeneity, network disconnection, etc.) based on the analysis of real desktop grid traces. In the paper, we present a large variety of execution scenarios which emulate up to 100,000 nodes.

(2)

Figure 1. Execution overview of MapReduce.

• Using the emulation framework on Grid5000, we evaluate experimentally BitDew-MapReduce against Hadoop, the reference implementation which also have fault-tolerant capabilities. Results show that BitDew-MR successfully passes all the tests of the framework while Hadoop would be unable to run on Internet Desktop Grid. More, thanks to the new scheduling optimizations, BitDew-MR outperforms Hadoop performances on several aspects: scalability, fairness, resilience to node failures, and network disconnection.

The rest of the paper is organized as follows. In section II we give the background of our research. In section III, we detail the runtime system design and implementation. In section IV and V we report the performance evaluation. We give related work in section VI and finally we conclude in section VII.

II. BACKGROUND

A. MapReduce

The MapReduce programing model [11] borrows concepts of tow list-processing combinators, map and reduce, know from Lisp and many other functional languages. This abstraction isolates the computation expression of users’’ applications from the details of massively parallel data processing on distributed systems, which will be handled by the MapReduce runtime system. The execution process consists of three phases:

• The runtime system reads input (typically from a distributed file system) and parses it into key/value-pair records. The map function iterates the records and maps each of them into a set of intermediate key/value pairs.

• The intermediate pairs are partitioned by the partition function, then grouped and sorted according to their keys. An optional combine function can be invoked to reduce the size of intermediate data. • The reduce function reduces the results of the

previous phase, once for a unique key in the sorted list, to arrive at a final result.

B. Hadoop

Hadoop is the reference MapReduce [11] implementation targeting to commodity clusters and enterprise data centers [1]. It consists of two fundamental subprojects: the Hadoop

Distributed File System (HDFS) and the Hadoop MapReduce framework.

HDFS is a master/slave architectural distributed file system inspired by GFS, which provides high throughput access to application data. A NameNode daemon, running on the master server, manages the system metadata, logically splits files into equal-sized blocks and controls the block distribution across the cluster from user clients taking into account the replication factor of the file for fault tolerance. Several DataNode daemons, running on slave nodes that are actually storing the data blocks, execute management tasks which are assigned by the NameNode and serve read/write requests from user clients.

The Hadoop MapReduce framework runs on top of HDFS and is also based on the traditional master/slave architecture. The master node runs a single JobTracker daemon to manage job status and task assignment. On each slave node, a TaskTracker daemon is responsible for launching new JVM processes for executing a task while periodically reporting tasks’’ progresses and idle task slots (slots number refereed to the maximum map/reduce tasks which can concurrently run in the slave) to the JobTracker through heartbeat signals. The JobTracker then updates the status of the TaskTracker and assigns new tasks to it considering the slot availability and data locality.

C. BitDew-MapReduce

BitDew [14] is an open source data management middleware which can be easily integrated within full-fledged desktop grid systems such as XtremWeb [7] and BOINC [2] as a subsystem. It provides simple APIs with a high level abstraction of data named Attributes to control the life cycle, distribution, placement, replication and fault-tolerance of their data in highly dynamic and volatile environments. The BitDew runtime environment adopts a flexible style of distributed service architecture: 1) it uses an open source objects persistence module and does not rely on a specific relational database for data catalog; 2) it integrates various asynchronous and synchronous data transfer protocols including FTP, http and BitTorrent; giving the users the freedom to select the most suitable one according to their applications.

Our BitDew-MapReduce (BitDew-MR) prototype [5] contains three main components: the API of the MapReduce programing model, the MapReduce library that includes master and worker daemon programs, and a benchmark MapReduce wordcount application. Figure 2 illustrates the architecture of BitDew-MR runtime system. It separates the nodes in two groups: stable nodes run various independent services which compose the runtime environment, and volatile nodes provide the storage and computing resources to run the map and reduce tasks. Normally, programmers will not use the various services directly; instead they call the API which encapsulates the complexity of internal protocols. They can use the BitDew API (or the command tool) to upload input data to workers and the MapReduce API to build their applications. The master and worker daemons of MapReduce library will handle the interactive with BitDew services for data management.

(3)

Figure 2. General overview of the system.

III. SYSTEM DESIGN AND IMPLEMENTATION

In this section we describe the runtime techniques of BitDew-MR. We focus in our discussion on the new BitDew attribute processing algorithms and the implementation of the main software components and their features.

A. Event-driven Task Scheduling

The key feature of BitDew is to leverage on data attributes that not only used to index and search data files, but also to dynamically control repartitioning and distribution of data onto the storage nodes. Programmers can also use data transfer events by manipulating data attributes to trigger task assignment actions; therefore avoid building their own scheduling component from scratch. For more details about these six abstractions defined by BitDew, see [14].

Unfortunately, it is not trivial to implement MapReduce task scheduling by just manipulating the BitDew data attributes. We summarize four core functionalities of the MapReduce scheduling design: a) data location aware task selection; b) idle-worker-pull-based dynamic task assignment; c) fault tolerance scheduling by re-executing failed tasks; d) speculative execution by backing up slow tasks. Hadoop implements all these functionalities and improves them in two ways: task slot abstraction to specify the number of concurrent tasks for efficient multi-core nodes’’ resources utilization, and conservative task backup with stragglers detection by comparing each task’’s progress score with the average value.

The traditional data processing approach on desktop girds distributes input files just at the beginning of the job execution which makes data-local scheduling meaningless. We define a new data attribute MUTAFF - stands for mutual affinity - to support separating input data distribution from their execution process; thus allowing users to cache their data into the worker nodes before launching their jobs. According to its literal meaning, MUTAFF is the bidirectional version of the original AFFINITY attribute.

An intuitive approach is using DISTRIB to simulate the task slot abstraction; FT is used to implement task re-execution and REPLICAT is used to backup tasks. But considering the user sets the REPLICA value to n for his job input data: at the beginning of the job execution, each worker gets n file chunks. Then, whenever they finish process these

chunks, they should un-register data of the chunks to trigger DS scheduling new data chunks to them, which makes re-executions of the corresponding tasks impossible. The REPLICAT is mainly designed for result checking by major voting, whereas running redundant backups for all tasks is an unacceptable resource waste (we do not take result checking into account in this paper because it is a challenging problem addressed in our other work [24]). We use the combined effect of REPLICA and MUTAFF to implement tasks backup. The actual control logic we implement within the data scheduler is a little subtle because the mutual affinity is not symmetrical relationship. For convenience of discussion, suppose there are dataa, b and set a.mutaff = b, we refer to the data a as strong MUTAFF data, data b as weak MUTAFF data. The DS will schedule strong MUTAFF data according to MUTAFF attribute firstly then schedules the remaining replicas according to the REPLICA attribute and vice-versa.

Algorithm 1 SCHEDULING ALGORITHM

Require:દ ൌ ሼࡰ૚ǡ ൉ ൉ ൉ ǡ ࡰ࢔ሽthe set of data managed by the scheduler Require:ઢ࢑ ൌ ሼ۲૚ࡷǡ ൉ ൉ ൉ ǡ ۲࢔ࡷሽ the data cache managed by the reservoir

host k

Require:઱ሺࡰ࢏ሻ ൌ ሼࡰ૚ǡ ൉൉൉ǡ ࡰ࢖ሽ the set of data belong to the same data

collection

Require:ષሺ۲࢏ሻ ൌ ሼܓǡ ൉൉൉ሽthe set of reservoir host owning data ࡰ૚ Ensure:શ࢑ ൌ ሼ۲૚ࡷǡ ൉൉൉ǡ ۲࢕ࡷሽ the new dataset managed by the reservoir

host k

1: શ࢑՚׎

2: sched_count ՚ 0 3: {Resolve mutual affinity } 4: for all۲࢏ࡷאઢ࢑do 5: શ࢑՚શ࢑׫ {۲࢏ࡷ} 6: if (kאȍ(ࡰ࢏ࡷ)) then 7: શ࢑՚શ࢑׫ {۲࢏ࡷǤ ݉ݑݐ݂݂ܽ} 8: end if 9: end for 10: main: 11: for allࡰ࢐א (Ĭ̳ઢ࢑) do

12: {Resolve mutual affinity dependence} 13: for allאdo 14: if ((ࡰ࢐.mutaff == ۲ࡷ࢏) ר (ࡰ࢐בઢ࢑)) then 15: શ࢑՚શ࢑׫ {۲࢏ࡷ} 16: continue main 17: end if 18: end for 19: {Schedule replica}

20: if ((ࡰ࢐.mutaff.replica < ȁȍ(ࡰ࢐)ȁ) ש (ࡰ࢐.replica < ȁȍ(ࡰ࢐)ȁ)) then

21: dist_count ՚ 0 22: for allࡰ࢑אશ࢑do 23: if ((ࡰ࢑א ઱ሺ۲࢐ሻ) then 24: dist_count ՚dist_count + 1 25: end if 26: end for

27: if (dist_count ൏ࡰ࢐.distrib) then

28: શ࢑՚શ࢑׫ {۲࢏ࡷ}

29: sched_count ՚sched_count + 1 30: end if

31: end if

32: if (sched_count ൒MaxDataSchedule) then

33: break 34: end if

35: end for

(4)

Algorithm 1 presents the pseudo-code of the modified data scheduling algorithm. Whenever a worker programs reports the set of data held locally through heartbeat message, the Data Scheduler iterates the worker’’s local data list and the global data list in order to make the scheduling decision according to their attributes, and uses a MaxDataSchedule threshold to limit the size of the set of the new data to be assigned per heartbeat, and thereby balancing the data distribution among all the workers. We omit the details which are less relative to our event-driven task scheduling. The scheduler firstly adds all the data should be kept in the worker’’s local data list to the new assigned data set શ࢑by their life-cycle attributes, while checks whether these kept data have mutual affinitive data in the global data list (strong MUTAFF data). If it is, the scheduler just adds the affinitive data (weak MUTAFF data) to શ࢑. The scheduler then finds all the weak MUTAFF data from the worker’’s local data list, and adds their affinitive strong MUTAFF data to શ࢑. The remaining strong MUTAFF data in the global data list will be assigned according to their DISTRIB and REPLICA attributes regardless of MUTAFF. As the originally algorithm, the DISTRIB attribute is always stronger than MUTAFF and REPLICA. The old DISTRIB can only limit the number of data simultaneously hold by a worker that have the same DISTRIB attribute (with the same attr id), we also extend it to restrict the number of data that belongs to the same DataCollection [5].

B. The BitDew-MapReduce Runtime

Our previous work [8] mainly aimed at showing the feasibility of building a BitDew based MapReduce runtime for large scale and loosely connected Internet Desktop Grid. We rewrite the upper layer MapReduce API to allow users to isolate their application- specified map and reduce functions with the data management code. Users can use the BitDew command-line tool to submit input data and launch jobs separately. The pre-uploaded input data can be distributed to and cached in worker machines before execution of the corresponding data processing job. It significantly improves the data locality of post-submitted jobs if the running map tasks already have their input data chunks on local machines. We also re-implement the master and worker daemons using the MUTAFF attribute, and adopt a new event-handle-thread design to cope with worker-side network failures.

Master

If a user uses the command-line tool to upload his input file, it will automatically split the file into equal-sized data chunks and returns an id of the corresponding DataCollection. Because we do not need cache multiple replicas on worker machines to guarantee the accessibility of the input file, all data in this DataCollection should be set attr = {replicat = 1, ft = true}. When a job launches, the master daemon initializes job configuration, fetches all data of input chunks from DC service by the input DataCollection id, and creates task token data that used to trigger worker daemons launch corresponding map/reduce tasks. All map tokens will be set MUTAFF to their corresponding input chunk: map_token_i.attr = {replicat = 2, distrib = 1, affmut = input_data_i}. The combination of REPILCA, DISTRIB

Figure 3. Worker handles.

and MUTAFF makes the scheduler to dynamically assign map tokens to workers and balance the load. When the job is closed to the end of the map phase, the scheduler replicates the remaining map tasks indicated by REPLICA values of the map tokens on idle workers, thereby shortening the job make-span. The mutual affinity triggers workers receive new map token to download the required task input files.

At the end of the computation, several result files will be generated by reduce tasks which have to be retrieved by the master. Master creates an empty data, a Collector and every Worker sets an AFFINITY attribute from Result data to Collector data. By this way, results are automatically transferred to the master node.

Worker

The worker daemon periodically gets data from DS service using ActiveData API, and then determines the actions to be performed according to the type of data received and their attributes. If a user submits an input file, all the workers will download spitted chunks that assigned to them after get the corresponding data from DS. We implement a multi-threaded transfer component which can process several concurrent files transfers especially for the synchronous protocols. After a user launches the job, the map tokens will be sent to the workers according to their MUTAFF attributes, while the reduce tokens will be scheduled in a round-robin way. We borrow the task slot abstraction from Hadoop to efficiently utilize the computing capacity of the modern multi-core hardware, each slot will be assigned to a separated map/reduce executing thread in the worker daemon, the number of maximum concurrent threads (slots number) of map and reduce tasks can be configured. Once a map task is finished, the worker daemon invokes the unpin method to un-associate the token data with its host to make scheduler assigning new map tokens limited by the DISTRIB attribute to it. After the output file of this task has been sent to the stable storage, the worker daemon un-registers the task token with DC and DS services to avoid any re-execution of this task in the future.

We use two different threads to invoke the ActiveData callbacks and process file transfers: one for transfer management and the other for data control. The main principle is to avoid putting time-consuming works and blocking I/O procedures in the bodies of ActiveData callback methods. Otherwise they may punctually prevent the

(5)

ActiveData main loop from sending heartbeats to DS, which in turn could make the DS service mistakenly marking the worker as ““dead””. We do not put any actual process logic into the active callbacks –– they only generate event that are added to the proper thread-safe event queues. To make worker programs resilient to temporary network failures, whenever the threads catch a remote communication exception (or any other kind of exceptions), they just skip the processing event and add it to the tail of its queue. The TransferManager API has also been modified to support automatic file retransmission. The TransferManager main loop just re-initializes a transfer if a failure occurs.

IV. EXPERIMENTAL METHODOLOGY

A. Platform and Application

We perform all our experiments in the GdX and NetGdX clusters which are part of the Grid5000 infrastructure. The two clusters are composed of 356 IBM eServer nodes featured with one 2-core 2.0GHz AMD Opteron CPU and 2GB RAM. All the nodes are running Debian with kernel 2.6.18, and interconnected by gigabit Ethernet network. All results described in this paper are obtained using Hadoop version 0.21.0, while the data is stored with 2 replicas per block in Hadoop Distributed File System. We perform our experiments by repeatedly executing the word count benchmark, with 50GB dataset generated by Hadoop RandomTextWriter application. The block size is set to 64MB and, tasks slots for the map and reduce tasks are set to 2 and 1 respectively. We fix the number of reducers per job to 10 reducers. The job make-span baselines of Hadoop and BitDew-MR in normal case are 399 seconds and 246 seconds respectively.

B. Emulation Scenarios

It is difficult to conduct experiments on large-scale and distributed systems such as desktop grids and re-produce the original results due to: 1) the implementation of system runtime plays an important role in the overall performance; 2) the resources of different machines of desktop grids can be heterogeneous, hierarchical, or dynamic; 3) machine failures and uses’’ usage behaviors make the system performance very hard to predict. We design seven experiment scenarios to emulate Internet-scale desktop grid environment on the Grid5000 platform. This environment emulation is based on the analysis of both: desktop grid system implementations and traces represent node availability of real desktop grids.

- Scalability. Volunteer computing projects such as SETI@home may have millions of participants, but the numbers of the server machines which offer the core system management services are relatively small. If large amount of the participant clients simultaneously connect to the central servers, a disastrous overload could occur. The first scenario evaluates the scalability of the core master services of Hadoop and BitDew-MR. We run central service daemons on one master node, and multi-threads clients that periodically perform remote meta-data creating operations in the master on 100 worker nodes. We tune the concurrent threads number of each clients and the operation interval.

- Fault Tolerance. Google MapReduce uses straightforward task re-execution strategy to handle frequent but small fractional of machine failures based on the observation of their commodity clusters [11]. However, the major contributor to resource volatility in desktop grid is not the machine failures but the users’’ personal behaviors such as shutting down their machines. Moreover, typical desktop grid systems including BOINC [2] and Condor [27] just suspend the running tasks when detecting other active jobs through keyboard or mouse events, which in turn aggravate the problem. CPU availability traces of participating nodes gathered from a real enterprise desktop grid [21] show that: a) the independent single node unavailability rate is about 40% on average; b) up to 90% of the resources can be unavailable simultaneously, which may create catastrophic effect on the running jobs.We emulate this kind of machine unavailability by killing worker and task processes on 25 worker nodes at different progress points of the map phase of job execution.

Hadoop MapReduce runtime system can handle two different kinds of failures: child task processes failure and TaskTracker daemons failure. We conduct three experimental scenarios for different failure modes: 1) kill all child map task processes; 2) kill TaskTracker processes; 3) kill all Java processes including DataNode daemons - to emulate the whole machine crash. At the meantime, considering that the common case of desktop grid environment is not process fail but the crash or leaving of whole machines, we make the BitDew-MR worker daemon multi-threaded within a single-process, which simplifies the data sharing of different system modules. We just kill the single worker process on each of the 25 chosen nodes to emulate the machine.

- Host Churn. The independent arrival and departure of thousands or even millions of peer machines leads to host churn. We periodically kill the MapReduce worker process on one node and launch it on a new node to emulate the host churn effect. To increase the survival probability of Hadoop job completion, we increase the HDFS chunk replica factor to 3, and set the DataNode heartbeat timeout value to 20 seconds.

- Network Connectivity. We set firewalls and NAT rules on all the worker nodes to disable all server-initiated and inter-worker network connections.

- CPU heterogeneity. We emulate CPU heterogeneity by adjusting half worker nodes CPU frequency to 50% with Wrekavoc [10].

- Straggler. Straggler is a machine that takes an unusually long time to complete one of the last few tasks in the computation [11]. We emulate stragglers by adjusting CPU frequency of target nodes to 10% with Wrekavoc.

- Network Failure Tolerance. The runtime system must be resilient to the temporary network isolation of a portion of the machines, which is very common in the Internet environment for the sake of users' behaviors and network hardware failures. We inject temporary off-line 25-second window periods in 25 worker nodes at different job progress points. To make sure the system master will mark the offline as dead, we set the worker heartbeat timeout to 10 seconds.

(6)

Figure 4. Scalability evaluation of core system service: (a) Hadoop creates empty files and (b) BitDew creates data. TABLE I. PERFORMANCE EVALUATION OF FAULT TOLERANCE SCENARIO

Job progress of the crash points 12.5% 25% 37.5% 50% 62.5% 75% 87.5% 100%

Hadoop Kill tasks

Re-executed map tasks 50 50 50 50 50 50 50 50

Job make-span (sec.) 425 425 423 427 426 429 431 453

Kill TTs

Re-executed map tasks 50 100 150 200 250 300 350 400

Job make-span (sec.) 816 823 809 815 820 819 812 814

Kill all Failed

BitDew-MR

Kill all

Re-executed map tasks 50 0 50 0 50 0 50 0

Job make-span (sec.) 450 411 389 351 331 299 279 247

V. EXPERIMENT RESULTS

A. Scalability

Figure 4 presents the operation throughput when varying the number of concurrent threads and the time interval between them in Hadoop and BitDew-MR. As shown in Figure 2, increasing the number of concurrent clients results in dramatic decrease in the number of meta-data operation per second for both Hadoop and BitDew-MR. However, BitDew-MR shows better scalability in contrast to Hadoop as it can achieve acceptable throughputs under typical Desktop Grid configuration, 1,000,000 PCs create meta-data every few minutes.

At the start, we think that the significant decrease in throughput under 1,000,000 concurrent emulated clients is contributed by the bottleneck of disk IO operations. But considering that the Hadoop NameNode persists its memory structures image into a write-ahead log file to group small random disk IO operations into big sequential IO, and that we are using a pure in-memory database as the BitDew backend data store during the experiments; the actual performance bottleneck of both systems is due to the synchronization overhead of highly concurrent RPC invocations. The Hadoop team also reported a similar scalability issue that occurs when the cluster size reaches up to tens or hundreds of thousands of nodes. A feasible solution is replacing the threads based RPC model with the event-driven asynchronous IO model.

B. Fault Tolerance

Table I shows the jobs’’ make-span time and the number of re-executed map tasks in fault tolerance scenario. In the Hadoop case: for the first test, whenever we kill the running

child tasks on 25 nodes, the JobTracker just re-schedules the 50 killed map tasks and prolongs the job make span time for about 6.5% in contrast to the normal case. For the second test, the JobTracker blindly re-executes all successfully completed and progressing map tasks on the failed TaskTrackers. Which indicates that all 25 chosen worker nodes just contribute zero to the whole job execution progress, resulting the job make-span time is almost doubled in contrast to the baseline. Finally, when killing all the Java processes on half of the worker nodes, jobs just fail due to the permanently loss of input chunks.

On the other hand, BitDew-MR avoids substantial unnecessary fault tolerant works. Because, in BitDew-MR, the intermediate outputs of completed map tasks are safely stored on the stable central storage server, thus, the master does not re-execute the successfully completed map tasks of failed workers. However, the main reason of the additional jobs’’ make-span time in BitDew-MR when worker nodes failure is the loss of half of the total amount of computing resources and the waiting time needed for re-downloading the input chunks to the survived worker nodes.

C. Host Churn

As Table II shows, for the tests of 5, 10, and 25 seconds of host churn intervals, Hadoop jobs could only progress up to the 80% of the map phase before they fail. The reason is that when the job enters the last stage, a great mass of input file chunks concentrate to the few rest old survival worker nodes. When new nodes join, they can only afford a small fraction of chunks. Eventually, HDFS cannot maintain the replica level resulting in permanently loss of data. For the tests of 30 and 50 seconds of interval, once an old worker leaves; the JobTracker will re-assign all the completed map

(7)

tasks of it to other nodes, which significantly delays the total job execution time.

TABLE II. PERFORMANCE EVALUATION OF HOST CHURN SCENARIO Churn Interval (sec.) 5 10 25 30 50

Hadoop job

make-span (sec.) failed failed failed 2357 1752 BitDew-MR job

make-span (sec.) 457 398 366 361 357 Similar to fault tolerance scenario, the BitDew-MR runtime does not waste the completed works done by the eventually failed worker nodes, therefore the host churn exerts very little effects on the jobs’’ execution performance. D. Network Connectivity

In this test, Hadoop could not even launch a job because the HDFS needs inter-communication between two DataNodes. On the other hand, BitDew-MR works properly and the performance is almost the same as the baseline in the normal case of network conditions.

E. CPU Heterogeneity and Straggler

As Figure 5 shows, Hadoop works very well with dynamic task scheduling approach when worker nodes have different classes of CPU. Nodes from the fast group process 20 tasks on average and the ones from the slow group get about 11 tasks. Although, BitDew-MR has same scheduling heuristic but it does not perform well according to Figure 6. The nodes from the two groups get almost the same number of tasks. The reason is that we only maintain one chunk copy on the worker nodes for the sake of the assumption that there are no inter-worker communication and data transfer. Thus, although fast nodes spend half of time to process their local chunks compare to the slow nodes, they still need to take much time to download new chunks before launching additional map tasks.

TABLE III. PERFORMANCE EVALUATION OF STRAGGLERS SCENARIO Straggler Number 1 2 5 10 Hadoop job

make-span (sec.) 477 481 487 490 BitDew-MR job

make-span (sec.) 211 245 267 298

Figure 5. Hadoop map task distribution over 50 workers.

Figure 6. BitDew-MapReduce map task distribution over 50 workers.

F. Network Failure Tolerance

In case of network failures, as shown in Table IV, Hadoop JobTracker just marks all the temporary disconnected nodes as ““dead”” - although they are still running tasks, and blindly removes all the tasks done by these nodes from the successful task list. Re-executing these tasks significantly prolongs the job make-span. Meanwhile, the BitDew-MR clearly allows workers to go temporarily offline without any performance penalty.

The key idea behind map tasks re-execution avoidance which makes BitDew-MR outperforms Hadoop under machine and network failure scenarios is allowing reduce tasks to re-download map outputs that have already been uploaded to the central stable storage rather than re-generate them. Another benefit of this method is that it does not introduce any extra overhead since all the intermediate data should be always transferred through the central storage regardless of whether or not use the fault tolerant strategy. However, the overhead of data transfer to the central stable storage makes desktop grid more suitable for the applications which generate a few intermediate data, such as Word Count and Distributed Grep. To mitigate the data transfer overhead, we can use a storage cluster with large aggregated bandwidth. The emerging public cloud storage services also provide an alternative solution, which can be included in our future work.

VI. RELATED WORK

There have been many studies on improving MapReduce performance [17, 18, 19, 28] and exploring ways to support MapReduce on different architectures [16, 22, 23, 26]. A closely related work is MOON [23] which stands for MapReduce On Opportunistic eNvironments. Unlike our work, MOON limits the system scale within a campus area and assumes the underlying resources are hybrid and organized by provisioning a fixed fraction of dedicated stable computers to supplement other volatile personal computers, which is much more difficult to implement in the Internet scale desktop grids. The main idea of MOON is prioritizing new tasks and important data blocks and assigning them to the dedicated stable machines to guarantee smoothly progressing of jobs even many volatile PCs join and leave the system dynamically. MOON also makes some tricky

(8)

TABLE IV. PERFORMANCE EVALUATION OF NETWORK FAULT TOLERANCE SCENARIO

Job progress of the crash points 12.5% 25% 37.5% 50% 62.5% 75% 87.5% 100% Hadoop Re-executed map tasks 50 100 150 200 250 300 350 400

Job make-span (sec.) 425 468 479 512 536 572 589 601

BitDew-MR

Re-executed map tasks 0 0 0 0 0 0 0 0

Job make-span (sec.) 246 249 243 239 254 257 274 256

modifications to Hadoop in order to solve the problem that the heartbeat reporting and data serving of native Hadoop worker daemons can be blocked by the PC users’’ actions, this is not an issue for the systems that originally designed for volunteer computing. Ko et al. [22] replicates inter- and inner- job intermediate data among workers through low priority TCP transfers to utilize idle network bandwidth. We focus on inner-job intermediate data availability in this paper and only replicate them on central storage for the sake of prohibition of inter-worker communication of desktop grids.

There are existing works on simulation and emulation of distributed systems. Well known general-purpose grid simulators include GridSim [4] and SimGrid [25]. OptorSim [6] focuses on studying and validating dynamic replication techniques. Simulation is commonly strong enough for designing and validating algorithms but not for evaluating real large-scale distributed systems. EmBOINC [13] uses a hybrid approach that simulates the population of volunteered BOINC clients. We use the same methodology to evaluate the scalability of BitDew services using one hundred nodes to simulate a huge number of concurrent clients. Wrekavoc [8] is a heterogeneity emulator that controls environment by degrading the nodes’’ performance that is similar to our approach, which is used in the heterogeneity scenarios.

Kondo et al. [21] measures a real enterprise desktop grid to analyze how the temporal characteristics of PCs affect the utility of desktop grids. Javadi et al. [20] uses clustering methods to identify hosts whose availability is independent and identically distributed according to the availability traces from real systems. Nurmi et al. [12] develops an automatic method for modeling the availability of Internet resources. The traces and models extracted from the real environment can be used as the workload input for more strict and accurate evaluation of the availability aware task-scheduling algorithm, which is also one of our future directions on BitDew-MR.

VII. CONCLUSIONS

Desktop grid computing offers vast amount of computing resources, which can be efficiently used for running scientific applications. However, as data generated from scientific instruments are continuously increasing, many efforts are done on utilizing Desktop Grid for data-intensive applications. Accordingly, in this paper, we extend our BitDew-MR framework and added new features including: aggressive task backup, intermediate results replication, task re-execution avoidance, and

network latency hiding optimization with the aim at facilitating the usage of large-scale desktop grid. We then design a new experimental framework which emulates key fundamental aspects of Internet desktop grid to validate and evaluate BitDew-MR against Hadoop.

Our evaluation results demonstrates that: 1) BitDew-MR successfully passes all the stress-tests of the framework while Hadoop is unable to work in typical wide-area network topology which includes PC hidden behind firewall and NAT; 2) BitDew-MR outperforms Hadoop performances on several aspects: scalability, fairness, resilience to node failures, and network disconnections.

ACKNOWLEDGMENT

Experiments presented in this paper were carried out using the Grid5000 experimental test bed, being developed under the INRIA ALADDIN development action with support from CNRS, RENATER and several universities

as well as other funding bodies (see

https://www.grid5000.fr).

This work is supported by the NSFC under grant Nos. 61133008, 60973037, National Science and Technology Pillar Program under grant 2012BAH14F02, Wuhan Chenguang Program under grant No. 201050231075, MOE-Intel Special Research Fund of Information Technology under grant MOE-INTEL-12-01, and the Agence National de la Recherche under contract ANR-10-SEGI-001.

REFERENCES

[1] Apache Hadoop. Available: http://hadoop.apache.org/

[2] D. P. Anderson, "BOINC: a system for public-resource computing and storage," in Proceedings of 5th IEEE/ACM International Workshop on Grid Computing (GRID’’04), 2004.

[3] D. P. Anderson and G. Fedak, "The Computational and Storage Potential of Volunteer Computing," in Proceedings of 6th IEEE International Symposium on Cluster Computing and the Grid (CCGrid’’06), 2006

[4] R. Buyya and M. M. Murshed, ““GridSim: A Toolkit for the Modeling and Simulation of Distributed Resource Management and Scheduling for Grid Computing.,”” in CoRR, 2002.

[5] T. Bing, M. Moca, S. Chevalier, H. Haiwu, and G. Fedak, "Towards MapReduce for Desktop Grid Computing," in

Proceedings ofFifth International Conference on P2p, Parallel, Grid, Cloud and Internet Computing (3PGCIC’’10), 2010. [6] W. H. Bell, D. G. Cameron, L. Capozza, A. P. Millar, K.

Stockinger, and F. Zini, ““OptorSim - A Grid Simulator for Studying Dynamic Data Replication Strategies,”” International Journal of High Performance Computing Applications, 17(4), 2003.

(9)

[7] F. Cappello, S. Djilali, G. Fedak, T. Herault, F. Magniette, V. N´eri, and O. Lodygensky, ““Computing on large-scale distributed systems: Xtremweb architecture, programming models, security, tests and convergence with grid,”” Future Generation Computer Systems, vol. 21, pp. 417––437, 2005.

[8] L. C. Canon and E. Jeannot, "Wrekavoc: a tool for emulating heterogeneity," in Proceedings of20th International Parallel and Distributed Processing Symposium (IPDPS’’06), 2006.

[9] S. Chen, and S. W. Schlosser, ““Map-Reduce Meets Wider Varieties of Applications,”” IRP-TR-08-05, Technical Report, Intel Research Pittsburgh, May, 2008

[10] T. Condie, N.Conway, P. Alvaro, J. M. Hellerstein, K. Elmeleegy, and R. Sears, ““MapReduce Online,”” in Proceedings of7th USENIX Symposium on Networked Systems Design and Implementation (NSDI’’10), 2010.

[11] J. Dean and S. Ghemawat, "Mapreduce: Simplified data processing on large clusters," Communications of the ACM, vol. 51, pp. 107-113, Jan 2008.

[12] D. Nurmi, J. Brevik, and R. Wolski, "Modeling Machine Availability in Enterprise and Widearea Distributed Computing Environments," in Proceedings of 11th International Euro-Par Conference (EuroPar’’05), 2005.

[13] T. Estrada, M. Taufer, K. Reed, and D. P. Anderson, "EmBOINC: An emulator for performance analysis of BOINC projects," in

Proceedings of23rd IEEE International Symposium on Parallel and Distributed Processing (IPDPS’’09), 2009.

[14] G. Fedak, H. Haiwu, and F. Cappello, "BitDew: A programmable environment for large-scale data management and distribution," in

Proceedings ofthe International Conference on High Performance Computing, Networking, Storage and Analysis (SC’’08), 2008. [15] S. Ghemawat, H. Gobioff, and S.-T. Leung, "The Google file

system," in Proceedings of 19th ACM Symposium on Operating Systems Principles (SOSP’’03), 2003.

[16] B. He, W. Fang, Q. Luo, N. K. Govindaraju, and T. Wang, "Mars: a MapReduce framework on graphics processors," in Proceedings of The Seventeenth International Conference on Parallel Architectures and Compilation Techniques (PACT’’08), 2008. [17] X. Huaxia, H. Dail, H. Casanova, and A. A. Chien, "The

performance of MapReduce: an in-depth study," in Proceedings of 36th International Conference on Very Large Data Bases (VLDB’’10), 2010.

[18] S. Ibrahim, H. Jin, L. Lu, B. He, G. Antoniu and S. Wu, ““Maestro: Replica-Aware Map Scheduling for MapReduce,”” in Proceedings

of The 12th IEEE/ACM International Symposium on Cluster, Cloud and Grid Computing (CCGrid’’12), 2012.

[19] S. Ibrahim, H. Jin, L. Lu, B. He, and S. Wu, ““Adaptive disk i/o scheduling for mapreduce in virtualized environment,”” in

Proceedings of 2011 International Conference on Parallel Processing (ICPP’’11), 2011

[20] B. Javadi, D. Kondo, J. Vincent, and D. P. Anderson, ““Mining for Statistical Models of Availability in LargeScale Distributed Systems: An Empirical Study of SETI@home””, In Proceedings of 17th IEEE/ACM International Symposium on Modelling, Analysis and Simulation of Computer and Telecommunication Systems (MASCOTS’’09), 2009.

[21] D. Kondo, M. Taufer, C. L. Brooks, H. Casanova, and A. A. Chien, "Characterizing and evaluating desktop grids: an empirical study," in Proceedings of18th IEEE International Symposium on Parallel and Distributed Processing (IPDPS’’04), 2004.

[22] S. Y. Ko, I. Hoque, B. Cho, and I. Gupta, ““Making Cloud Intermediate Data Fault-Tolerant””, in Proceedings of ACM Symposium on Cloud Computing (SOCC’’10), 2010

[23] H. Lin, X. Ma, J. Archuleta, W. C. Feng, M. Gardner, and Z. Zhang, "MOON: MapReduce On Opportunistic eNvironments," in

Proceedings of 19th International Symposium on High Performance Distributed Computing (HPDC’’10), 2010.

[24] M. Moca, G. C. Silaghi, and G. Fedak, "Distributed Results Checking for MapReduce in Volunteer Computing," in

Proceedings of 2011 IEEE International Symposium on Parallel and Distributed Processing Workshops and Phd Forum (IPDPSW’’11), 2011.

[25] M. Quinson, "SimGrid: a generic framework for large-scale distributed experiments," in Proceedings of Ninth International Conference on Peer-to-Peer Computing (P2P’’09), 2009.

[26] C. Ranger, R. Raghuraman, A. Penmetsa, G. Bradski, and C. Kozyrakis, "Evaluating MapReduce for Multi-core and Multiprocessor Systems," in Proceedings of 13st International Conference on High-Performance Computer Architecture (HPCA’’07), 2007.

[27] D. Thain, T. Tannenbaum, and M. Livny, "Distributed Computing in Practice: The Condor Experience," Concurrency and Computation: Practice and Experience, Vol. 17, No. 2-4, pp. 323-356, February-April, 2005.

[28] M. Zaharia, A. Konwinski, A. D. Joseph, R. Katz, and I. Stoica, "Improving MapReduce Performance in Heterogeneous Environments," in Proceedings of 8th USENIX Symposium on Operating Systems Design and Implementation (OSDI’’08), 2008.

References

Related documents

Representations are linked to power, Adichie explains, “the ability not just to tell the story of another person, but to make it the definitive story of that person.” When we are

For each individual whose compensation must be reported in Schedule J, report compensation from the organization on row ( i) and from related organizations , described in

counseling was feasible to implement in outpatient commu- nity-based substance abuse treatment settings, was effective in producing modest abstinence rates and strong reductions

Two components are important to achieve the multi-party risk efficiency: recalling a sense of shame through straightforward presentation for honest listening and reconsidering

It seemed impossible, though, that electrons unleashed inside a thundercloud could make it through many, many miles of atmosphere to an altitude in space where they could hitch

It is also important to note Somali women reported experiences of positive aspects of childbirth, for example women reported an appreciation for care received, support from

90’ı 5 µm altı olan gezegen bilyalı değirmende 4 saat boyunca öğütülen tozla (C4 tozu) devam edilmesi uygun görülmüş olup bu tozun detaylı tane boyut analizi Şekil

The dynamic model of the grid consists of turbine governors (TG), automatic voltage regulators (AVR) as well as wind turbines, solar power units and energy storage units1.