From the Cloud to the Atmosphere: Running
MapReduce across Datacenters
†Chamikara Jayalath
Julian Stephen
Patrick Eugster
Department of Computer Science
Purdue University
West Lafayette IN-47907, USA
Abstract—Efficiently analyzing big data is a major issue in our current era. Examples of analysis tasks include identification or detection of global weather patterns, economic changes, social phenomena, or epidemics.
The cloud computing paradigm along with software tools such as implementations of the popular MapReduce framework offer a response to the problem, by distributing computations among large sets of nodes. In many scenarios input data is geographically distributed (geo-distributed) across datacenters, and straightforwardly moving all data to a single datacenter before processing it can be not only unnecessary but also prohibitively expensive, but the above-mentioned tools are designed to work within a single cluster or datacenter and perform poorly or not at all when deployed across datacenters.
This paper deals with executing sequences of MapReduce jobs on geo-distributed datasets. We analyze possible ways of executing such jobs, and propose data transformation graphs that can be used to determine schedules for job sequences which are optimized either with respect to execution time or monetary cost. We introduce G-MR, a system for executing such job sequences, which implements our optimization framework. We present empirical evidence in Amazon EC2 and VICCI of the benefits of G-MR over common, na¨ıve deployments for processing geo-distributed datasets. Our evaluations show that using G-MR significantly improves processing time and cost for geo-distributed datasets.
Index Terms—geo-distributed, MapReduce, big-data, datacenter.
F
1
I
NTRODUCTIONBig data analysis is one of the major challenges of our era. The limits to what can be done are often times due tohow muchdata can be processed in a given time-frame. Big datasets inherently arise by applications generating and retaining more information to improve operation, monitor-ing, or audit; applications such as social networking support individual users in generating increasing amounts of data.
Implementations of the popular MapReduce frame-work [17], such as Apache Hadoop [4], have become part of the standard toolkit for processing large datasets using
cloudresources, and are provided by most cloud vendors. In short, MapReduce works by dividing input files into chunks and processing these in a series of parallelizable steps. As suggested by the name, mapping and reducing constitute the essential phases for a MapReduce job. In the former phase, a set of mapper processes read respective input file chunks and produce hkey, vali pairs called “intermediate data”. Each reducer process atomically applies the reduc-tion funcreduc-tion to all values of eachkey assigned to it.
1.1 Geo-Distribution
As the initial hype of cloud computing is wearing off, users are starting to see beyond the illusion ofomni-present computing resourcesand realize that these are implemented by concrete datacenters, whose location matters. More †Work financially supported by DARPA grant # N11AP20014 and Purdue
Research Foundation grant # 204533.
and more applications relying on cloud platforms are geo-distributed, for any (combination) of the following reasons: (a.) data is stored near its respective sources or frequently accessing components (e.g., clients) which can be distributed, but the data is analyzed globally;
(b.) data is gathered and stored by different (sub-)organi-zations, yet shared towards a common goal;
(c.) data is replicated across datacenters for availability, incompletely to limit the overhead of costly updates. Examples of (a.) is data generated by multi-national companies or organizations with sub-datasets (parts of datasets) being managed regionally [10] (e.g., user profiles or customer data) to provide localized fast access for com-mon operations (e.g., purchases), yet that share a comcom-mon format and are analyzed as a whole. An example of (b.) is the US census [2] data collected and stored state-wise (hundreds of GBs respectively) but aiming at global analy-sis. Similar situations are increasingly arising throughout the US government due to the strategic decision [3] of consolidating independently established infrastructures as well as data in a “cloud-of-clouds”. (c.) corresponds to most global enterprise infrastructures which replicate data to overcome failures but do not replicate every data item across all datacenters to not waste resources or limit the cost of synchronization [23], [26].
But how to analyze such geo-distributed datasets most efficiently with MapReduce? Assume, for instance, that there are substantially sized Web caches on multiple con-tinents and that a Web administrator needs to execute a
query across this dataset. Gathering all sub-datasets into a single datacenter before handling them with MapReduce is always possible. We refer to this as COPY execution path. However, this can become inefficient if the total output data generated by the MapReduce job is much smaller than the original input dataset or any intermediate datasets as the
inter-datacenter bandwidth is usually much smaller than the
intra-datacenter bandwidth; it can also become financially expensive with cloud providers like Amazon charging users only for the former kind of communication.
Another option is executing individual instances of the MapReduce job separately on each sub-dataset in respective datacenters and then aggregating the individual results. This way of execution, which we refer to as MULTI execution path, will only yield the expected outcome if the MapReduce job is “associative”, and it may not be possible to perform the aggregation automatically. Associativity in the MapReduce sense does not mean that the order of the application of MapReduce jobs does not have an impact on the final result. What we allude to is the ability to apply a given job on parts of the input without having an effect on the final output. Examples ofnon-associative operations include many statistical operations, e.g., determining the median size of the pages in a Web cache.
A third option is to perform the MapReduce jobs as a single geo-distributed operation where mappers and re-ducers may be deployed in different datacenters (GEO execution path). One extreme version of this is to allow each mapper or reducer to be randomly allocated in any of the considered datacenters with much of the their com-munication taking place remotely, which is close to how current MapReduce implementations perform such jobs – if they support multiple datacenters at all [5], [6]. A more controlled version would pay tribute to location or sizes of individual sub-datasets or the semantics of the job in order to decide on distribution. This can lead to performing the map operation in one subset of the considered datacenters and then copying intermediate data to another subset of datacenters (which may or may not overlap with the prior subset) and performing the reduce step in these.
Of course things can become more complicated in re-ality, and so a geo-distributed MapReduce may also be a combination of some of the three options above. Thus there are many ways of executing a MapReduce job on a geo-distributed dataset, and corresponding performances can strongly differ. While the popular Hadoop supports none of these options, all of them can be implemented manu-ally. Besides requiring repetitive coding and administration efforts, such a manual approach however does not provide any guidance in selecting the most suitable option for a given MapReduce job and geo-distributed dataset.
1.2 Job Sequences
To make matters worse, a MapReduce job does not always come alone. Frequently,sequencesof MapReduce jobs are executed on a given input by applying the first job on the given input, applying the second job on the output
of the first job, and so on. An example is the handling of large Web caches by executing an algorithm such as PageRank [7]. The PageRank algorithm constructs a graph that describes the intra-page relationships, which are refined using further MapReduce jobs. Another example is hash-tree generation, performed to verify the integrity of data. Each level of the hash-tree can be generated using a MapReduce job starting from leaves, which represent the original input. Many times distinct MapReduce jobs are applied in sequence rather than iteratively performing the same job. For instance the query performed on the geo-distributed Web cache mentioned above may have a filtering step and a content search step, which need to be executed as two consecutive jobs. Sequences also arise, indirectly, when using PigLatin/Pig [24], [14] to describe complex data analysis tasks via data-flow graphs from which MapReduce jobs are generated automatically.
When performing a sequence of MapReduce jobs the number of possible schedules increases dramatically: the optimal solution for an individual job in COPY execution path may consist in copying selectively the input data toa particular subsetof involved datacenters before executing the job, and further aggregating the data later in the sequence. The number of possibilities for moving some subset of data from one datacenter to another is huge, and further increases with the amount of data.
1.3 G-MR
G-MR is a Hadoop-based system that can efficiently per-form a sequence of MapReduce jobs on a geo-distributed dataset. G-MR employs a novel algorithm named DTG
(data transformation graph) algorithm that determines an optimized schedule for performing a sequence of duce jobs based on characteristics of the dataset, MapRe-duce jobs, and the datacenter infrastructure. DTG algorithm can be used to optimize for either execution time or (monetary) cost. So our solution can be used to determine an optimal schedule executing the MapReduce jobs as fast as possible or to determine an optimal schedule for exe-cuting the MapReduce jobs in the most cost effective way. This schedule is then enforced by G-MR through a geo-distributed chain of operations consisting of zero or more geo-distributed copy operations and MapReduce executions performed within one or more datacenters using given clusters available in each of the participating datacenters.
1.4 Contributions and Roadmap
The contributions of this paper are thus as follows. 1) We analyze the problem of executinggeo-distributed
MapReduce job sequences as arising in “cloud-of-clouds” scenarios;
2) We propose a novel algorithm named DTG algorithm that determines the most effective schedule to execute a given sequence of geo-distributed MapReduce jobs, optimizing for either the execution time or the cost; 3) We present G-MR, a Hadoop-based geo-distributed
4) We validate our solution through experiments con-ducted on Amazon EC2 clusters and in VICCI [9] with microbenchmarks and well-known MapReduce appli-cations, illustrating the effectiveness of our solution. In short, for sequences of 3 jobs run in EC2, G-MR (i.e., the schedule proposed by our DTG algorithm) improves both cost and time by up to3×compared to currently used deployments of Hadoop; even simple sequences of 2 MapReduce jobs run in VICCI G-MR were up to3.5×as fast as current deployments. The remainder of this paper is organized as follows. Section 2 presents background information on MapReduce. Section 4 details our system model and describes our DTG algorithm. Section 5 details the data transformation graph construction process. Section 6 presents G-MR, and Section 7 evaluates it. Section 8 overviews related work. Section 9 draws conclusions.
2
B
ACKGROUNDThis section overviews the MapReduce [17] model and its popular Hadoop [4] open-source implementation.
2.1 MapReduce
A MapReduce [17] program consists in map and reduce
functions. Execution of one of these functions is called a
phase. A MapReducejobadds input data to those functions. The types of the functions are outlined in Figure 1. The former function takes an input record and outputs a set of
hkey, valipairs. The latter function accepts a set of vals
for a givenkey, and emits sets of values (often themselves
hkey, valipairs).
An application can use any number of mappers and reducers. A set of input files are divided into chunks and distributed over the mappers. The chunks assigned to a given mapper are calledsplits. Assignment ofhkey2, val2i
output pairs of mappers to reducers is based on the key
key2. More precisely, a partitioning function is used to
assign a mapper output tuple to a reducer. Typically, this function hashes keys (key2) to the space of reducers. The
MapReduce framework writes the map function’s output locally at each mapper and then aggregates the relevant records at each reducer by having them remotely read the records from the mappers. This process is called the
shufflestage. These entries are unsorted and first buffered at the reducer. Once a reducer has received all its values
val2, it sorts the buffered entries, effectively grouping them
together by keykey2. The reduce function is applied to each
key2assigned to the respective reducer, one key at a time,
with the respective set of values.
The MapReduce framework parallelizes the execution of all functions and ensures fault-tolerance. A master node surveys the execution and spawns new reducers or mappers when corresponding nodes are suspected to have failed.
2.2 Apache Hadoop
Hadoop [4] is a Java-based MapReduce implementation for large clusters. It is bundled with the Hadoop Distributed
map hkey1, val1i 7→list(hkey2, val2i)
reduce hkey2, list(val2)i 7→list(val3)
Fig. 1. Mapper and reducer types [17].
File System (HDFS) [13], which is optimized for batch workloads such as those of MapReduce. In many Hadoop applications, HDFS is used to store the input of the map phase as well as the output of the reduce phase. HDFS is, however, not used to store intermediate results such as the output of the map phase. They are stored on the individual local file systems of nodes.
The Hadoop follows a master-slave model where the master is implemented in Hadoop’sJobTracker. The master is responsible for accepting jobs, dividing those intotasks
which encompass mappers or reducers, and assigning those tasks to slave worker nodes. Each worker node runs a
TaskTrackerthat manages its assigned tasks.
A default split in Hadoop contains one HDFS block (64 MB), and the number of file blocks in the input data is used to determine the number of mappers. The Hadoop map phase for a given mapper consists in first reading the mapper’s split and parsing it into hkey1, val1i pairs.
Once the map function has been applied to each record, the TaskTracker is notified of the final output; in turn, the TaskTracker informs the JobTracker of completion.
The JobTracker informs the TaskTrackers of reducers about the locations of the TaskTrackers of correspond-ing mappers. Shufflcorrespond-ing takes place over HTTP. A re-ducer fetches data from a configurable number of mapper-TaskTrackers at a time, with 5 being the default number. The output of the reduce function is written to a temporary HDFS file. When a reducer terminates, the file is renamed.
3
M
ODELIn this section, we describe the model of geo-distributed systems, data, and operations as considered in this paper. In the subsequent two sections, we present our solution to efficiently process geo-distributed data in this model.
3.1 System and Data
We consider n datacenters, given by DC1, DC2, ..., DCn
with input sub-datasetsI1, I2, ..., In respectively. The total
amount of input data is thus |I| = Pn
i=1|Ii|. The
band-width between the datacenters DCi and DCj (i 6= j) is given byBi,j and the cost (monetary) of transferring a unit of data between the same two datacenters isCi,j.
We define an integer,minimum initial partition size,Ψ, to be the approximate unit of input data that should be transferred across datacenters. ∀ i ∈ {1,2, .., n}, each sub-datasetIi will have b|
Ii|
Ψ cequally-sizedpartitions. So
for datacenter DCi the size of a partition is |Ii|/b|IΨi|c. The total number of partitions p is Pn
i=1b
|Ii|
Ψ c. As will
be explained later, we define a partition size to keep our solution bounded. It is easy to see how there can be an exponentially growing number of schedules without introducing a granularity.
3.2 Operations
On this geo-distributed input dataset, asequenceof MapRe-duce jobs,J1, J2, ..., Jm, have to be executed. Since each
MapReduce job consists of two major phases (map and reduce), the sequence consists of2×mphases. We define the state of data before a phase as astageidentified by an integer 0..2m. So input data is in stage 0and final output data received after applying MapReduce jobs 1..m is in stage2m.
To move data from stage s to next stage s + 1 a MapReduce phase is applied to data partitions and the same number of (output) data partitions are created. The initial
kthpartition of data is denoted byP0
k, andP s
k represents the
output after executing MapReduce phases1..son partition
Pk0. Partition Pks0 is thus a derivation of partition Pks if
s0> s. The total amount of data after executing the prefix
of MapReduce phases1..son the dataset is Pp
k=1|P
s k|.
Each datacenterDCi hosts a MapReduce cluster of size Xi. Before performing a MapReduce phase, a partition
present in a datacenter may be moved to another datacenter. To make our solution tractable we only allow full partitions of data to be copied. The move may be for an initial partition or a derivative of it received after executing one ore more MapReduce phases. Initial partition sizes can be used as parameter to trade accuracy and computation costs. The problem considered in the following is to determine the optimum schedule for applying MapReduce phases, i.e., in which stages should derived partitions be moved and to where in order to minimize the total time (or cost) taken to perform all MapReduce jobs.
4
G
EO-D
ISTRIBUTEDM
APR
EDUCEIn this section we detail the issues with geo-distributed datasets, and introduce our solution for optimizing the execution of a MapReduce job sequence on a given dataset.
4.1 DTG Algorithm Overview
In this section, we outline our algorithm to determine an optimal execution path for performing a sequence of MapReduce operations on a geo-distributed input. The al-gorithm can minimize execution time or cost. The approach involves constructing a graph, named adata transformation graph (DTG), representing possible execution paths for performing MapReduce phases on input data An example data transformation graph is given in Figure 2.
A given node describes the number of MapReduce phases that have been applied on input data and the location of the derivative of each partition. Each row of nodes of the graph belongs to the same stage. So, a node in the graph is described as N∆s where s is the number of MapReduce operations in the sequence that have been applied so far and ∆ is a p-tuple of integers of the form
hd1, d2, ..., dpi describing the current distribution of data
∀ k ∈ {1,2, .., p}: dk denotes that Pks is located in
datacenter DCdk. N0D N0A 0 1 2 N0B N1 A N1B N2 A N2B N0C N1 C N2 C N1 D N2 D A= <1,2,3> B= <2,2,3> C= <3,2,3> D= <1,1,3> F= <1,2,1> G= <1,2,2> E= <1,3,3> N0E N0F N0G N1 E N1F N1G N2 E N2F N2G J= <2,2,2> I= <1,1,1> H= <3,3,3> N0H N0I N0J N1 H N1I N1J N2 H N2I N2J
Fig. 2. An example data transformation graph for a MapReduce sequence consisting of a single job exe-cuted across three datacenters. Each datacenter has exactly one partition.
As mentioned earlier, a user can specify if the DTG algorithm should determine an optimal solution for exe-cution time or the (monetary) cost, where cost involves both cost to maintain nodes and for transferring data. Edge weights are correspondingly determined according to either the time or cost for performing the corresponding operation as described shortly.
Edges are directed towards nodes in same or higher stages. An edge across two nodes in a same stage denotes a set of data copy operations. Edges across stages represent the application of one or more MapReduce phases.
A path from the starting node to a possible end node in a data transformation graph defines anexecution path. After constructing the data transformation graph, the problem of determining the optimal way to operate the MapReduce jobs boils down to determining the execution path with the minimum total weight from a starting node to a final node of the corresponding graph. The well-known Dijk-stra’s shortest path algorithm [19] is used to determine the optimal path for a data transformation graph.
4.2 Individual MapReduce Jobs
Next we describe how a data transformation graph can be constructed for a single MapReduce job. Subsequently, we will generalize this to constructing a data transformation graph for a sequence of MapReduce jobs.
4.2.1 Possible Execution Paths
When data is distributed across datacenters and thus copy-ing of data is involved, executcopy-ing map and reduce phases individually can be different from executing both phases together. The former will probably involve copying interme-diate data to a distributed file system and reading them back while the latter may only keep intermediate data in nodes where corresponding mappers and reducers are hosted.
There are three obvious execution paths for performing a given MapReduce job on a given geo-distributed input. Figures 3(a)-3(c) illustrate these.
COPY All input data is copied to a single datacenter and MapReduce is performed within this datacenter. GEO Map phases are performed in each of the datacenters
on respective sub-datasets, and intermediate data is then transferred to a single datacenter. After this the reduce phase is performed on the total intermediate data within this datacenter.
MULTI MapReduce is performed in each of the datacenters on respective sub-datasets. Results are then copied to a single datacenter and aggregated.
The difference between these execution paths is where the inter-datacenter copy operation of sub-datasets occurs – before the mapping, between mapping and reducing, or after reducing respectively. Obviously, these three are not the only possible execution paths. As mentioned previously, we consider partitions to be the smallest possible movable data units. So it is possible to move one or more partitions to different datacenters before executing a MapReduce operation; such moves give rise to different execution paths. We later describe how we can do simplifications to reduce the number of execution paths that have to be considered. To enable the execution of MapReduce jobs in MULTI execution path, we extend the MapReduce framework by an aggregate phase. The aggregate phase can be used to combine the results of one or more reducers, provided that the job is associative (as defined previously). The aggregate phase can be as trivial as appending the results of individual reducers or may consist of a more complex function. We denote the aggregate phase byA. The signature of the phase is outlined in Figure 4.
aggregate list(list(val3))7→list(val4)
Fig. 4. Aggregator type
4.2.2 Single Job Graph and Simplifications
Each MapReduce job will be represented by three stages in a data transformation graph numbered from s = 0..2.
s = 0 represents data prior to executing the map phase,
s = 1 represents data after executing the map phase but prior to executing the reduce phase and s= 2 represents data after executing both map and reduce phases.
Figure 2 shows an example data transformation graph for performing a single MapReduce job on a input distributed across three datacenters.
As mentioned previously we only allow full partitions of data to be moved from one datacenter to another. Even with this assumption, the total number of nodes and possible execution paths in a graph can become quite large. In this na¨ıve case, the total number of nodes in a single stage is
O(pn), which will become extremely large with the number
of partitions considered. To reduce the considered number of nodes and transformations we prune the data transforma-tion graph by performing the following simplificatransforma-tions give below. These simplifications have already been applied to the data transformation graph given in Figure 2.
• A data copy operation should always reduce the num-ber of datacenters.
Moving a data partition from one datacenter to another is costly since this involves copying data across inter-datacenter links from one distributed file system to an-other. Copying across distributed file systems usually introduces significant latencies and inter-datacenter data transfer comes with a cost. We thus allow such moves only if the move is from a nodeNhjd
1,d2,...,dpi to a node Nhjd0
1,d 0
2,...,d0pi where |{d1, d2, ..., dp}| >
|{d01, d02, ..., d0p}|That is, the set of involved
datacen-ters is reduced.
• A partition or a derivative of it should not be moved back to the datacenter where the partition is located. This basically means that if a partition Pks is moved fromDCitoDCj at stages, this partition or a deriva-tive of it will not be moved back to DCi at a later
stage. As mentioned in the previous point, copying data partitions across datacenters involve significant costs. So moving a derivative back to a datacenter where it was previously present will rarely occur in an optimum path.
• No substitutive data copying within the same stage. If a partitionPs
k is moved fromDCitoDCj in stage sno partition is moved fromDCj toDCi within the same stage. Since we defined partitions from different datacenters to be roughly equal in size and as it will be detailed later, we assume the cost of a MapReduce operation to be a function of the size of input data, result of movingxpartitions fromDCitoDCjandy
partitions fromDCj toDCiin the same stage, where
x≥y will roughly be equivalent to moving (x−y)
partitions fromDCitoDCj. So substitutive copy will
have no advantage within the same stage.
With the above simplifications, the time to construct and analyze graphs is significantly reduced. Table 1 gives the number of nodes of the data transformation graphs and the the time required to construct and analyze them before (na¨ıve) and after (simplified) pruning when each datacenter has a exactly one partition. The experiment was executed on a single computer with a2.66GHz Intel Core I7 processor and4GB or RAM.
Data transformation graph obtained yields of course an approximation of the optimum solution, as this may involve moving partition data partially. As we will show in our experiments in Section 7 though, our approximation
Datacenter Operation Input Copied input MR output Copied MR output Copy operation M output Copied M output
M Map R Reduce MR MapReduce
MR (a) COPY M M M R (b) GEO MR MR MR (c) MULTI Fig. 3. Three main execution paths for a given MapReduce job
n nodes nodes time (sec) time (sec) (na¨ıve) (simplified) (na¨ıve) (simplified)
2 4 3 0.02 0.02 3 27 10 1.6 0.14 4 256 41 4.2 0.44 5 3125 196 >1hour 2.16 6 46656 1057 >1hour 32.6 TABLE 1
Impact of simplifying data transformation graphs
provides already highly improved execution characteristics compared to current straightforward deployments.
4.3 Sequences of MapReduce Jobs
To determine the best execution path for a sequence of MapReduce jobs, the data transformation graph has to be extended. To this end, ∀ 0 ≤ l < m, each ending node
N2
∆ of the data transformation graph for MapReduce jobl
is simply overlapped with the corresponding starting node,
N∆0 of the MapReduce job l+ 1. The stage identifiers of the jobl+ 1 are incremented by(2l−2).
Figure 5 illustrates a data transformation graph for a sequence of two MapReduce jobs.
4.4 Replicated Datasets
To support scenarios where input sub-datasets are replicated across datacenters, we extend the DTG algorithm simply in the following way. We define a dataset replica to be a dataset formed by taking exactly one replica of each of the input sub-datasets. So, with ri replicas of sub-dataset i, there are a total ofQn
i=1ri dataset replicas.
A data transformation graph is constructed for each dataset replica by executing the previously defined algo-rithm on all such replicasin parallel. The optimal execution path is then determined by taking the best out of the paths determined by the individual runs. This path is chosen to be the final path and the corresponding dataset replica is selected to be the winning replica on which the MapReduce jobs will be executed.
N0A 0 1 N0B N1A N1B N2A N2B N0C N1C N2C
. . .
. . .
. . .
2 4 N3A N3B N3C N4A N4B N4C. . .
. . .
3 A= <1,2,3> B= <2,2,3> C= <3,2,3>Fig. 5. Data transformation graph for a sequence of two MapReduce jobs
5
G
RAPHC
ONSTRUCTIONIn this section we describe how data transformation graphs are annotated to determine the optimal execution path for a sequence of geo-distributed MapReduce jobs.
5.1 Functions
For constructing the data transformation graph, we rely on a number of functions. First, we use functions to approximate the time and cost of distributing data across datacenters. These are denoted by T(Ns
∆, N∆s0) and C(N∆s, N∆s0)
re-spectively. Here Ns
∆ and N∆s0 are starting and ending
nodes respectively. We allow inter-datacenter data transfer operations only within a given stage so starting and ending nodes must be in the same stage.
Next, we use functions to approximate the amount of output data generated by complete MapReduce jobs
(exe-cuting both map and reduce phases together) and thetime
consumed to execute them. These are denoted byM Rd l(I)
andM Rt
l(I)respectively. Here,Idenotes the size of input
data used and l identifies the MapReduce job.
Next, we use functions to approximate the amount of out-put data generated and time to execute only the map phase. These are denoted byMd
l(I)andM t
l(I)respectively. Next,
we use functions to approximate the amount of output data generated and time to execute only the reduce phase which are denoted byRd
l(I)andRtl(I)respectively.
Finally, we use functions, denoted Atl(I)andAdl(I), to approximate the time consumed and total output generated when aggregating output from multiple locations. Here
I denotes the total size of the outputs considered and l
identifies MapReduce job. Note that we rely on limited user input for constructing the data transformation graphs as associativity (and the necessary aggregation functions) can generally not be inferred automatically.
5.2 Determining Functions
We use inter-datacenter data transfer charges provided by the resource providers. Inter-datacenter bandwidth values are determined dynamically by sending a small portion of data across datacenters (1 GB by default). C() and T()
are determined by considering the transfer of individual sub-datasets and using the corresponding bandwidths and transfer costs.
We use extrapolation to approximate the rest of the functions. G-MR chooses a sample of the input data and executes M R,R, M, and A for each of the MapReduce jobs in parallel in each unique MapReduce cluster. The sizes of the outputs and the time for execution are recorded. Linear approximations of the above functions are obtained via two sample points for each of them.
To reduce the time taken for sampling, we limit the total amount of sampling data for a given function to be
10%of the total input in the cluster where the sampling is performed. Two sampling points with different input sizes are considered. We choose them, so that the amount of input data for the larger sampling point is twice the amount for the smaller sampling point. For example, for a total input of 100 GB, the sizes of the sampling points would be 3.3
GB and6.6 GB respectively, so that the total becomes10
GB. Once computed, sampling points and corresponding functions can be reused in future executions. Note that this default sampling mechanism can be overridden by either (a.) configuring G-MR to consider more than two sampling points (b.) configuring G-MR to extrapolate for a higher degree polynomial. In some scenarios, a user may exactly know the sizes of outputs generated by a MapReduce job for a given input. Two examples for this are sorting (O(I)) and searching for a single result (O(1)). In these cases the user can exactly specify the functions, so that G-MR does not have to perform sampling approximate them.
One occasional problem with MapReduce executions consists in outliers [12]. In MapReduce outliers increase the execution time. So in clusters with high failure rates,
we execute each sampling point twice and accept the one with a smaller execution time as the correct result.
5.3 Edge Weights and Optimal Execution Path
We use the above-mentioned approximations to determine edge weights. As mentioned previously the DTG algorithm can optimize for either execution time or cost. We create two separate graphs for these two cases.
If the user needs to optimize only for either execution time or cost, we use the corresponding graph to determine the optimal execution path. But if the user needs to optimize for both execution time ad cost, we determine the final execution path by adhering to the following heuristic.
Both paths considering execution time as edge weights and considering cost as edge weights are determined. If both data transformation graphs give the same optimal execution path, that path is obviously chosen to be the final execution path. If two paths are different, we use the same approximation functions to determine the cost for the optimal path of the graph that used execution time as edge weights and the execution time of the optimum path of the graph that used time as the edge weights. Assume that the time and cost for executing the optimum path of the data transformation graph instantiated with execution times as edge weights to beT V1 andCV1 respectively, and the
time and cost for executing the optimal path for the data transformation graph instantiated with costs to beT V2 and
CV2 respectively. If CVCV12 ≤ T VT V21 then we chose the former
execution path otherwise the latter.
5.4 Example Data Transformation Graph
Figure 6 illustrates expressions for calculating the edge weights for an example data transformation graph. The graph is for executing a sequence of a single MapReduce job running across two datacenters. Each datacenter has a single partition of sizeΨ. The figure shows the expressions that are used to determine the weights of the edges starting from node N0
h1,2i for both optimizing for execution time (W1) and cost (W2).
The figure shows how functions Bi,j andCi,j are used to determine weights of the edges going across nodes in the same stage and how functions M R,RandM for the MapReduce job (determined prior to this construction) are used to determine the weights of the edges going across nodes in different stages. Function A for the MapReduce job is used to determine the aggregation costs of stage2.
6
G-MR
This section describes G-MR, our framework for executing geo-distributed MapReduce jobs leveraging our DTG algo-rithm. G-MR is implemented in Java consists of4900lines of code. G-MR uses Hadoop to execute a MapReduce jobs in a given datacenter.
N0<1,2> stage 0 stage 1 stage 2 N1<1,2> N2<1,2> N0<2,2> N1<2,2> N2<2,2> N0<1,1> N1<1,1> N2<1,1> W1=Ψ/B2,1 , W2=Ψ*C2,1 W1=Ψ/B1,2, W2=Ψ*C1,2 W1=Mt(Ψ) W2=2*Mt(Ψ)*X*K W1=Rt(Ψ1) W2=2*Rt(Ψ1)*X*K W1=MRt(Ψ) W2=2*MRt(Ψ))*X*K Ψ1=Md(Ψ) Ψ2=MRd(Ψ) W1 W2 - Time - Cost W1= Ψ1/B2,1, W2= Ψ1*C2,1 W1= Ψ1/B1,2, W2= Ψ1*C1,2 W1=Mt(2*Ψ) W2=Mt(2*Ψ)*X*K W1=MRt(2*Ψ) W2=MRt(2*Ψ)*X*K W1=Mt(2*Ψ) W2=Mt(2*Ψ)*X*K W1=Ψ2/B1,2 + At(2*Ψ 2) W2=Ψ2*C1,2 + At(2*Ψ 2)*X*K W1=Ψ2/B2,1 + At(2*Ψ 2) W2=Ψ2*C2,1 + At(2*Ψ 2)*X*K To next job W1=Rt(2*Ψ1) W2=Rt(2*Ψ1)*X*K W1=Rt(2*Ψ1) W2=Rt(2*Ψ1)*X*K W1=MRt(2*Ψ) W2=MRt(2*Ψ)*X*K X1=X2=X
Fig. 6. An example data transformation graph for a MapReduce sequence consisting of a single job.
6.1 Components
Figure 7 overviews the components of G-MR. GroupMan-ager and JobManager are its two main components. A given G-MR instance will consist of a single GroupMan-ager that initiates the geo-distributed MapReduce jobs and a JobManager in each of the participating datacenters. Each JobManager component manages the parts of the MapReduce jobs that is performed within its datacenter.
At initiation, G-MR starts theGroupManagercomponent at one of the datacenters. Usually this is started at the node where the starting script of G-MR is run. The GroupMan-ager is a central component that processes datacenter and job configurations. It uses the DTG algorithm to decide on the optimal schedule for performing MapReduce jobs. The GroupManager informs JobManagers about the parts of the geo-distributed MapReduce jobs that should be executed within their respective datacenters.
G-MR initiates a Hadoop cluster in each datacenter, which is used by the corresponding JobManagers to execute their respective MapReduce jobs. The JobManager uses two additional components, aCopyManagerwhich handles copying of data across datacenters and an AggregationMan-ager that can aggregate results from datacenters.
Datacenters Copy Manager JobManagers
Hadoop AggregationManager
GroupManager + DTG algorithm configDC configJob
Fig. 7. Architecture of G-MR
6.2 Executing Individual Phases
As mentioned previously, our algorithm supports the sce-narios where mapping and reducing of a given MapReduce job are executed in two different geographical locations. To implement this, G-MR dynamically generates a straightfor-ward mapper and a reducer jobs for each MapReduce job.
A straightforward reducer simply outputs intermediate data it receives while a straightforward mapper can read this intermediate data from a distributed file system and output the same results the actual mapper would have produced.
For each job, to implement M we used the actual user provided mapper and the corresponding straightforward reducer while using the corresponding trivial mapper and the reducer provided by the user to implementR.
6.3 Datacenter Configuration
A GroupManager initiates the G-MR instance by process-ing an XML-baseddatacenter configuration file. This file describes the datacenters that may participate in the geo-distributed MapReduce jobs handled by G-MR. A sample file is given in Figure 8.
An identifier is provided for each datacenter to refer to it when running MapReduce jobs. Datacenters keep their data in independent distributed file systems. Currently G-MR supports Amazon S3 and Apache HDFS.
<dcconf> <datacenter> <id>DC1</id> <provider>EC2</provider> <fstype>HDFS</fstype> <jobtracker> a.useast.amazonaws.com:9001 </jobtracker> <filesystem> hdfs://b.useast.amazonaws.com </filesystem> </datacenter> </dcconf>
Fig. 8. Sample datacenter configuration file
6.4 Job Configuration
Geo-distributed MapReduce job sequences are submitted to the GroupManager using an XML-basedjob configuration
number of copy and MapReduce operations. This infor-mation is passed to JobManager components describing the portion of work that should be executed within their respective datacenters.
A sample job configuration file is given in Figure 9.
<jobconf> <input> <datacenter>DC1</datacenter> <location>/user/ec2-user1/webinput</location> </input> <input> <datacenter>DC2</datacenter> <location>/user/ec2-user2/webinput</location> </input> <mapper>gmr.WordCountMapper</mapper> <reducer>gmr.WordCountReducer</reducer> <associative>true</associative> <aggregator>gmr.WordCountAggregator</aggregator> <optimizeForTime>false</optimizeForTime> <mrFunction>gmr.WordCountMRFunction</mrFunction> <mFunction>gmr.WordCountMFunction</mFunction> <rFunction>gmr.WordCountRFunction</rFunction> <aFunction>gmr.WordCountAFunction</aFunction> </jobconf>
Fig. 9. Sample job configuration file
The job configuration starts with a number of input
tags each giving a location of a part of input data to be used in the MapReduce job. The mapper and reducer
tags give the mapper and reducer classes to be used in the MapReduce job. Users can specify whether the MapReduce operation is associative using theassociativetag and if G-MR should try to minimize the cost or the execution time using the optimizeForTime tag. For associative operations user can also specify an optional aggregator class which can be used to aggregate results after running the MapReduce job for different datasets.
As mentioned previously, a user has the option of man-ually specifying the four functions, M R, R,M,A so that G-MR does not have to perform sampling to determine approximations. These have to be provided as classes that implement the interface given in Figure 10.
SizeFunction has functionscalculateOutputSize
andcalculateExecutionTime, that if provided will be
used to approximate the output size and execution time of M R, M, R, and A described previously. If they are not provided G-MR will execute corresponding operations on a sample of the input to approximate the functions to polynomial equations. The degree of the approximated polynomials by default is 1 (meaning a linear equation approximation), but can be configured to be a higher value.
6.5 Aggregators
As mentioned previously G-MR uses aggregators to ag-gregate the results obtained after applying the same set of MapReduce jobs to different parts of the input. Once output data generated in one or more datacenters is copied to a single destination datacenter, the GroupManager instructs the JobManager running in the destination datacenter to
public interface SizeFunction {
long calculateOutputSize(long input);
long calculateExecutionTime(long input);
}
public interface Aggregator {
void aggregate(Iterator<List<K, V>> values,
List<K, V> output) }
Fig. 10. SizeFunctionandAggregatorinterfaces
initiate an aggregate operation. The JobManager may de-cide to perform the aggregate operation using an additional MapReduce run. Figure 10 shows our aggregator interface.
6.6 Failure Handling
Failure handling resembles that in Hadoop. Each JobMan-ager component regularly sends a heartbeat message back to the GroupManager. If the GroupManager does not receive a heartbeat from a given JobManager for a predefined timeout interval a new JobManager component is started in the same datacenter. If the GroupManager receives a delayed heartbeat from a previously suspected JobManager a message is sent back instructing the old JobManager to terminate itself.
Once the GroupManager starts a MapReduce job it stores information about this job in a predefined location of the distributed file system in its datacenter. A newly started JobManager will look up this location for information about MapReduce jobs started by previous JobManagers.
Failure of a GroupManager will result in the failure of the G-MR instance and existing geo-distributed MapReduce jobs will not be able to complete. This is similar to the termination of theJobTrackercomponent of Hadoop. .
7
E
VALUATIONThe goal of our empirical evaluation was mainly three-fold: (1) quantify the benefits of our approach over na¨ıve deployments, (2) assess the accuracy of our performance estimation in practice, and (3) demonstrate the optimality of the identified execution paths in spite of pruning. Below we report on (1) and (2). We evaluated (3) by comparing the execution paths identified by our DTG algorithm with and without pruning, for experiments with up to 1 TB of data, 10 MR jobs and 5 datacenters; the same optimal execution paths were always identified in both cases.
7.1 Setup, Datasets, and Applications
We used three Amazon EC2 datacenters for the exper-iments. 10 large instances (7.5 GB of memory with 4
EC2 compute units) were leased from each datacenter. The cost of each instance was $0.34 per hour. The cost for transferring data between two EC2 datacenters (Ci,j
∀i 6= j ∈ [1..3]) was $0.1 per GB. For evaluations on VICCI [9] we used two clusters, each with 10 nodes configured to run Hadoop.
Our experiments used the datasets outlined in Table 2, with the following MapReduce applications:
Dataset GBs Description
CENSUSDATA 121 Year 2000 US census [2] EDUDATA 5 University Website crawl WEATHERDATA 20 Weather measurements [1] PLANTDATA 10 Properties of Iris plant [8] HADOOPDATA 100 Logs of Yahoo! Hadoop cluster NGRAMDATA 300 Google Books Ngrams
TABLE 2
Datasets used for evaluation
CENSUSPROCESSOR: Filters and groups CENSUSDATA. Consists of two MapReduce jobs: 1. filter out a given set of fields from each record, and 2. sample (optional) and group records based on a field. When sampling (different from thatinsideG-MR described in Section 5) is enabled, only one in 25 records will be used in the grouping operation. This application is associative.
WORDCOUNT: Counts the number of occurrences of each word of a text-based dataset. The dataset used was EDUDATA. This application is associative.
MEDIANWEATHER: Computes the median of records in a dataset – WEATHERDATA. The application was used to determine the median temperature of each day from 1929 to 2009. This application is non-associative.
KNN: Determines the class of the records of a dataset by determining the K nearest neighbors. Each record of the dataset is compared with a known sample to determine these K nearest neighbors. The dataset used was PLANTDATA. The application was used to determine the type of each plant record out of three known plant types. This application is associative.
ETL: We execute a two job MapReduce sequence on Hadoop logs from Yahoo! (HADOOPDATA) in JSON for-mat to extract the inforfor-mation we require and transform it to the required format. The first MapReduce job extracts a configurable set of properties from the log and converts it to tabular format. The second job performs a cross product on this tabular data. (The first job reduces the data with respect to the input while the second job yields an increased amount.) We ran this extract-transform-load application in the VICCI [9] cluster.
NGRAM: Determines all combinations of last two words of 4grams (sets of consecutive four words from texts). A sequence of two MapReduce jobs, the first one performs sampling while the second generates combinations.
7.2 Benefits
We conducted experiments to show the benefits of consider-ing distribution characteristics when performconsider-ing sequences of MapReduce jobs on a large geo-distributed dataset. 7.2.1 Potential
For the first experiment we used CENSUSDATA. Data was distributed over two EC2 datacenters located in US east coast (DC1) and west coast (DC2) based on the US
state where the data was collected from. To illustrate the amplitude of performance differences between different
execution paths we first performed CENSUSPROCESSOR on the original dataset and a sub-dataset of size 61 GB according to three different execution paths. We conducted experiments both with sampling enabled and disabled. In the first path of execution (Copy) all data was copied to DC1 prior to the execution of the two jobs. In the
second path (FilterCopy) the filtering was performed in each of the datacenters. After the filtering step, all data was moved toDC1and grouped. In the third path both filtering
and grouping were performed in individual datacenters (FilterGroupCopy). Results were then moved to DC1 and
aggregated by executing the grouping operation on the collective set of results.
The cost of execution and the execution time are given in Figures 11(a) and 11(b) respectively. The graphs illustrate the significant differences in cost and time between dif-ferent execution paths. Performing MapReduce jobs using
Copy path can be up to 2× costlier and 4× more time consuming than the other two paths. When sampling is disabledFilterCopy is clearly the best way to execute the jobs both for optimizing time or cost. But when sampling was enabled the differences were minimal, because the sampling reduced the amount of data to be copied and aggregated inFilterGroupCopy.
7.2.2 Cost
We conducted experiments to determine the cost of exe-cuting three single geo-distributed MapReduce jobs and a sequence of three MapReduce jobs using G-MR. We com-pared G-MR based execution (denoted byOptimal) with a straightforward deployment of Hadoop where all input is copied to a single datacenter before MapReduce jobs are executed (denoted by CopyAndExecute). In one case we also evaluated a execution path where inversely separate MapReduce jobs are performed on respective inputs in each of the datacenters and then the output data is copied to a singleDC (denoted by ExecuteAndCopy).
Both costs for (i) maintaining node instances for the MapReduce jobs and (ii) for transferring data across data-centers were considered. For this experiment we used first MapReduce clusters located in DC1 and DC2. Only the
charges for the execution time of the Hadoop jobs were considered. G-MR was configured to optimize for cost.
Figures 12(a), 12(b) and 12(c) show the cost incurred for executing applications WORDCOUNT, MEDIANWEATHER, and KNN respectively on geo-distributed datasets. The X-axis gives the percentage of the dataset that was available inDC1 (the rest was inDC2.)
For the considered data points of WORDCOUNT(see Figure 12(a)), G-MR operated in the MULTIexecution path. The results clearly show the benefits of this choice. Since theCopyAndExecutedeployment always copied input data over to DC1, data transfer costs were higher when DC1
contained a small percentage of input data. For a 50/50
input distribution, the cost of operating with G-MR was only0.62×the cost of theCopyAndExecutedeployment.
For MEDIANWEATHER (see Figure 12(b)) G-MR se-lected the GEO execution path for the considered input
8 0 2 4 6 C o s t ($ ) Copy FilterGroupCopy 121 GB 61 GB (sampled)121 GB (sampled)61 GB FilterCopy
(a) Large dataset - Cost
0 2 4 6 8 Tim e (h ou rs )
Copy FilterGroupCopy FilterCopy
121 GB 61 GB 121 GB 61 GB (sampled) (sampled) (b) Large dataset - Time
1,4,1 2,2,2 1,2.5,2.5 3 0 0.5 1 1.5 2 2.5 Input Distribution (GB) C o st ($ ) Optimal CopyAndExecute
(c) MapReduce sequence - Cost
1,4,1 2,2,2 1,2.5,2.5 2,000 0 400 800 1200 1600 Input Distribution (GB) Ex e cu ti o n T im e ( se c) Optimal CopyAndExecute
(d) MapReduce sequence - Time
Fig. 11. Processing a sequence of MapReduce jobs on a large geo-distributed dataset in different ways
20 40 60 80 0.6 0.2 0.3 0.4 0.5 % of input in DC1 C o s t ($ ) CopyAndExecute Optimal
(a) WORDCOUNT
0 2 0 0.4 0.8 1.2 1.6 % of Input in DC1 C o s t ($ ) CopyAndExecute Optimal 10 30 50 70 90 (b) MEDIANWEATHER 20 40 60 80 1.3 0.8 0.9 1 1.1 1.2 % of input DC1 C o s t ($ ) CopyAndExecute ExecuteAndCopy Optimal (c) KNN 2 3 4 2 0 0.4 0.8 1.2 1.6 # of DCs C o s t ($ ) Optimal CopyAndExecute (d) Datacenters - Cost
Fig. 12. Executing a sequence of two MapReduce jobs on a geo-distributed dataset for optimal execution cost
0 20 40 60 80 360 120 160 200 240 280 320 % of input in DC1 Ex e c u ti o n T im e ( s e c ) CopyAndExecute Optimal
(a) WORDCOUNT
1000 400 500 600 700 800 900 % of Input in DC1 Ex e c u ti o n T im e ( s e c ) CopyAndExecute Optimal 10 30 50 70 90 (b) MEDIANWEATHER 90 0 20 40 60 80 1,500 600 900 1200 % of Input in DC1 Ex e c u ti o n T im e ( s e c ) CopyAndExecute ExecuteAndCopy Optimal (c) KNN 2 3 4 1600 0 400 800 1200 # of DCs Ex e c u ti o n T im e ( s e c ) Optimal CopyAndExecute (d) Datacenters - Time
Fig. 13. Executing a sequence of two MapReduce jobs on a geo-distributed dataset for optimal execution time
distributions. For a 50/50 distribution of input, cost of operating with G-MR was only0.5×the cost of operating with theCopyAndExecutedeployment.
For KNN (see Figure 12(c)), G-MR again operated in the MULTI execution path. The size of the output generated by this application is slightly larger than the size of its input. G-MR operated efficiently at both ends of the input distribu-tion graph since it determines the best datacenter for storing the final output (unless the job specifies this). For a25/75
input distribution the cost of operating with G-MR was only0.77×the cost of operating with theCopyAndExecute
deployment. The cost of G-MR based execution was only
0.71×compared to a na¨ıve ExecuteAndCopy deployment that executes separate MapReduce jobs on respective inputs in each of the datacenters and then always copies output data toDC1.
We evaluated the cost of executing a sequence of MapRe-duce with three jobs – a filtering job, MEDIANWEATHER, and a sorting job. Figure 11(c) shows the cost of executing
the sequence. The figure only reports the optimal execution path determined by G-MR and the execution with CopyAn-dExecutedeployment. To not overload the figure we refrain from reporting all other sub-optimal execution paths. This experiment used three EC2 datacenters, and was conducted for three input distributions. Operating with G-MR resulted in much lower costs for all three distributions. For example when each datacenter had an equal amount of input data, the cost of operating with G-MR was only0.55×the cost of operating with the CopyAndExecute deployment. One reason for this was that DTG algorithm chose to move data only after the first filtering job, resulting in much lower inter-datacenter data transfer costs.
Figure 14(a) shows the results for NGRAM where input data was in Amazon S3 instead of HDFS clusters.240GB of ngrams were in a S3 bucket located in DC1 while
60GB were in a S3 bucket inDC2.PartialCopygives the
execution path where some data partitions were moved from
balance the data. G-MR chose GEOto be the optimal path which as the Figure shows had the lowest cost.
0 A B C D 20 0 5 10 15 Execution Path C o s t ($ ) CopyAndExecute ExecuteAndCopy Optimal A B C DPartialCopy
(a) NGRAM- Cost
0 A B C D 8 0 2 4 6 Execution Path Ex e c u ti o n T im e ( h o u rs ) CopyAndExecute ExecuteAndCopy Optimal A B C DPartialCopy (b) NGRAM- Time Fig. 14. S3 Data 7.2.3 Time
We conducted experiments to determine the time to execute MapReduce applications on geo-distributed datasets. Here G-MR was thus configured to optimize for the execution time. Results are shown in Figures 13(a), 13(b), 13(c). Behavior and execution path selection of G-MR was similar to the previous set of experiments.
In the WORDCOUNT application G-MR operated in MULTIexecution path for the considered set of data points. For the MEDIANWEATHER application, G-MR operated in GEO execution path. For the KNN application, G-MR chose MULTI execution path. For WORDCOUNT, process-ing geo-distributed input usprocess-ing G-MR was 2×faster than theCopyAndExecutedeployment for a50/50distribution of input and for MEDIANWEATHER G-MR was 1.4× faster than the ExecuteAndCopy deployment for a similar input distribution. For the KNN, G-MR was1.2×and1.4×faster than ExecuteAndCopy and CopyAndExecute deployments respectively for a25/75distribution of the input data.
Figure 11(d) shows the time for executing the previously mentioned sequence of MapReduce jobs with G-MR and with CopyAndExecute deployment for three input distri-butions. G-MR was1.7×faster than the CopyAndExecute
deployment since it chose the most efficient execution path. Figure 14(b) shows the time for executing NGRAM. G-MR chose GEOto be the optimal path which as the Figure shows had the lowest execution time.
Last but not least, Figure 16 shows the results of our experiments in VICCI. This experiment ran ETL on the HADOOPDATA. The data was distributed as 60 GB in a first datacen-ter and 40 GB in a sec-ond datacenter. The end re-sult was in the first dat-acenter. Here, G-MR was
3.5× faster than Execute-Copyand2.3×faster than
0 A B C 14 0 2 4 6 8 10 12 Execution Path Ex e c u ti o n T im e ( h o u rs ) ExecuteAndCopy CopyAndExecute Optimal A B C
Fig. 16. Time for execut-ing ETL in Vicci
CopyExecute deployments. VICCI being free of charge allowed us more easily to conduct repeated and detailed ex-periments to drill down on variance; the variations between execution times of different runs were however so low that they could not be visible in the graphs. For instance, the CopyAndExecute path included ≈58 mins of actual MapReduce operation (the rest was pure data transfer which was not repeated). The largest difference between any 2 runs we ever observed was 96 sec on those≈58 mins. 7.2.4 Number of Datacenters
We also conducted experiments to observe the behavior of G-MR when the number of datacenters is increased from two to four. Figures 12(d) and 13(d) show the cost and execution time characteristics respectively of the MEDIAN -WEATHER application when the number of participating datacenters was increased from two to four. We used EC2 datacenters from US east coast, US west coast, Europe and Asia for these experiments.
For the considered data points G-MR operated in the MULTI execution path. The cost and execution time of G-MR was consistently smaller than that of the CopyAndEx-ecutedeployment. This shows that G-MR keeps providing benefits when the number of participating datacenters is increased. For both experiments the execution time and the operating cost increased with the number of datacenters due to the increased usage of resources and other overheads.
7.3 Accuracy
We compared the cost and execution time values predicted by the DTG algorithm to the actual results obtained. Figures 15(a) and 15(b) show the cost values predicted by the DTG algorithm for applications WORDCOUNT and MEDIANWEATHER respectively when configured to opti-mize for the cost with the actual values that were observed. Similarly, Figures 15(c) and 15(d) show the execution time values that were predicted by the DTG algorithm when configured to optimize for the execution time with the actual values that were observed for the same applications. Results clearly show that the values predicted for both cost and execution time were on par with the actual values obtained. All predicted and actual graphs showed similar patterns when the input distribution was changed. The differences between predictions and measurements were regular and were mainly due to the variations of the actual performance of the EC2 instances [18].
7.4 Choosing a Provider
We wanted to determine the characteristics of the DTG algorithm when used for cloud resource providers with different service costs and guarantees. To this end, we con-ducted simulation-based experiments to determine how the weight of the optimal path of a data transformation graph varies with number of jobs, number of datacenters and the characteristics of the available resources. Results are show in Figure 17. The total amount of data was set to be100GB, and was configured to be distributed equally among the
85 0 10 20 30 40 50 60 70 0.4 0.2 0.25 0.3 0.35 % of Input in DC1 C o s t ($ ) Copy Predicted Copy Actual Multi Predicted Multi Actual
(a) WORDCOUNT- Cost
100 0 20 40 60 80 1.5 0 0.3 0.6 0.9 1.2 % of input in DC1 C o s t ($ ) Copy Predicted Copy Actual Geo Predicted Geo Actual
(b) MEDIANWEATHER- Cost
90 0 10 20 30 40 50 60 70 80 320 120 160 200 240 280 % of input in DC1 Ex e c u ti o n T im e ( s e c ) Copy Predicted Copy Actual Multi Predicted Multi Actual
(c) WORDCOUNT- Time
100 0 20 40 60 80 800 0 100 200 300 400 500 600 700 % of input in DC1 Ex e c u ti o n T im e ( s e c ) Copy Predicted Copy Actual Geo Predicted Geo Actual
(d) MEDIANWEATHER- Time
Fig. 15. Prediction accuracy
2 3 4 5 10 5 6 7 8 9 # of DCs Tim e (h ou rs ) (BW=4, CC=0.1, NC=0.1) BW=8, CC=0.1, NC=0.1) (BW=4, CC=0.2, NC=0.1) (BW=4, CC=0.1, NC=0.2) (a) DCs - Time 2 3 4 5 2.6 1 1.6 2 # of DCs C o s t ($ ) (BW=4, CC=0.1, NC=0.1) (BW=4, CC=0.2, NC=0.1) (BW=4, CC=0.1, NC=0.2) (BW=8, CC=0.1, NC=0.1) (b) DCs - Cost 2 3 4 5 6 16 4 6 8 10 12 14 # of Jobs Tim e (h ou rs ) (BW=4, CC=0.1, NC=0.1) (BW=8, CC=0.1, NC=0.1) (BW=4, CC=0.2, NC=0.1) (BW=4, CC=0.1, NC=0.2) (c) Jobs - Time 2 3 4 5 6 4 0 1 2 3 # of Jobs C o s t ($ ) (BW=4, CC=0.1, NC=0.2) (BW=4, CC=0.2, NC=0.1) (BW=4, CC=0.1, NC=0.1) (BW=8, CC=0.1, NC=0.1) (d) Jobs - Cost
Fig. 17. Weight of the optimal path
datacenters. Each graph shows the inter-datacenter band-width (∀i6=j Bi,j =BW MB/s), cost of inter-datacenter data transfers (∀i 6= j Ci,j =$CC per GB) and the cost
of leasing a node from a datacenter ($N C per hour). All MapReduce jobs were assumed to have the same cost and time functions.
Both execution cost and execution time increased linearly with the number of MapReduce jobs considered but the increase with respect to the number of participating data-centers was non-linear. Adding a new job will always add an extra stage to the data transformation graph increasing the cost of the optimal path linearly in most scenarios. But the effect of increasing the number of datacenters keeps getting less dramatic and the time/cost tends to stabilize for large numbers of datacenters.
Increasing the inter-datacenter bandwidth had a dramatic affect on the execution time but had no effect on the exe-cution cost for the considered experiment. This is because increasing the bandwidth only changes the time taken to perform data transfer operations, not the amount of data that gets transferred across datacenters.
Increasing the data transfer cost and the cost of leasing nodes did not affect the execution time but considerably affected the execution cost. This is because increasing these costs have no effect on the time taken to perform MapReduce jobs but has an effect on the cost of performing either data-transfer or MapReduce operations.
8
R
ELATEDW
ORKVolley [10] is a system for automatically geo-distributing data based on the needs of an application, presupposing
knowledge on the placement of its data, client access patterns, and locations. Based on this information, data will be dynamically migrated between datacenters to maximize the efficiency. Services like Volley can be used to optimize the placement of data before handling them with G-MR, but this only solves a part of the problem.
Dryad [20] includes a programing model and a frame-work for developing massively scalable distributed appli-cations. Dryad provides the programmer with more fine grained control over the execution of the program and may contain non-uniform treatment (across sub-datasets). Dryad does not posses mechanisms to process geo-distributed data or to identify the most suitable geo-distributed execution path for a given job.
DEDUCE [22] is an approach to combine a stream processing solution with the MapReduce framework. The solution is not for using MapReduce to analyze a dynamic data stream, but to use MapReduce to periodically analyze large amounts of data that get stored in certain points of a stream processing system. DEDUCE cannot be used to process geo-distributed datasets.
There have been a number of efforts to improve the efficiency of single MapReduce jobs. These are largely orthogonal to our approach, and it would be interesting to investigate combinations of these with G-MR. Zaharia et al. [30] improve the performance of Hadoop by making it aware of the heterogeneity of the network.
MapReduce Online [16] is a modification to MapReduce that will allow MapReduce components to start executing before the data has been completely materialized. For example, this allows reducers to start before the complete output from mappers are available and will be able to
execute based on event streams.
Hoque et al. [21] describe a solution for efficiently preserving and replicating intermediate data of a MapRe-duce application. The solution works by asynchronously replicating selective datasets in multiple datacenter racks. For example, in a cascaded set of MapReduce executions, a mapper of a job can be immediately started if the results of the reducer of a previous job is replicated and retained. Yang et al. [28] introduce an extra MapReduce phase named merge, that works after map and reduce phases and extends the MapReduce model for heterogeneous data. Their merge phase and our aggregation phase have certain similarities but the semantics are different. The merge phase focuses on merging multiple reducer outputs from two different lineages while we focus on the aggregation of the two datasets of the same lineage.
Mantri [12] is an efficient outlier mitigation technique for MapReduce. The solution includes smart placement of reducers and proactive rescheduling of tasks. Outlier mitigation techniques like Mantri can be readily used by G-MR since each geo-distributed MapReduce job consists of multiple individual datacenter MapReduce jobs.
Chang et al. [15] introduce approximation algorithms that order MapReduce jobs such that it minimizes overall job completion time. Though orthogonal to our approach, it would be interesting to see how these algorithms can be adapted to work for jobs spread across multiple datacenters. Many storage systems that can support and process big data in cloud environments have been proposed recently. Walter [27] is a key-value storage system that can hold massive amounts of geo-distributed data. COPS [23] is another storage system for geo-distributed data. These do not address actual computations over stored data. RAM-Cloud [25] and RDDs [29] can efficiently handle large amounts of data but have to be deployed within a datacenter.
9
C
ONCLUSIONSThis paper presents G-MR, a MapReduce framework that can be used to efficiently execute a sequence of MapReduce jobs on geo-distributed datasets. G-MR relies on a novel algorithm termed DTG algorithm which looks for the most suitable way to perform the job sequence, minimizing either execution time or cost. We believe that our framework is also applicable to single datacenters with non-uniform transmission characteristics, such as datacenters divided intozones or other network architectures [11].
We have illustrated through real MapReduce application scenarios that G-MR can substantially improve time or cost of job execution compared to na¨ıve schedules of currently widely followed deployments. We are in the process of extending G-MR to provide further optimization in the context of more complex jobs generated by Pig [14]. In that context, we are also investigating how to exploit
commutativityof MapReduce jobs in sequence.
A
CKNOWLEDGMENTSWe are very grateful to Amazon and to Larry Peterson for making it possible to evaluate our research in EC2 and
VICCI respectively. We would also like to thank Donald McGillen of Yahoo! for making the HADOOPDATA data-set available to us.
R
EFERENCES[1] Daily Global Weather Measurements. http://aws.amazon.com/ datasets/Climate/2759.
[2] Data from Year 2000 US Census. http://aws.amazon.com/datasets/ Economics/2290.
[3] Department of Defense Information Enterprise Strategic Plan 2011-2012. http://dodcio.defense.gov/docs/DodIESP-r16.pdf.
[4] Hadoop. http://hadoop.apache.org.
[5] Hadoop Across Data-Centers. http://mail-archives.apache.org/mod mbox/hadoop-general/201001.mbox/%3C4B4B6AC7.6020801@ lifeless.net%3E.
[6] Hadoop: The Definitive Guide. http://oreilly.com/catalog/ 9780596521981.
[7] PageRank Algorithm. http://en.wikipedia.org/wiki/PageRank. [8] Properties of the Iris Plant. http://archive.ics.uci.edu/ml/datasets/Iris. [9] Vicci – A Programmable Cloud Computing Testbed. http://www.
vicci.org.
[10] S. Agarwal, J. Dunagan, N. Jain, S. Saroiu, A. Wolman, and H. Bhogan. Volley: Automated Data Placement for Geo-Distributed Cloud Services. InNSDI 2010.
[11] M. Al-Fares, A. Loukissas, and A. Vahdat. A Scalable, Commodity Data Center Network Architecture. InSIGCOMM 2008.
[12] G. Ananthanarayanan, S. Kandula, A. Greenberg, I. Stoica, Y. Lu, B. Saha, and E. Harris. Reining in the Outliers in Map-Reduce Clusters Using Mantri. InOSDI 2010.
[13] Apache Software Foundation. Apache HDFS. http://hadoop.apache. org/hdfs/.
[14] Apache Software Foundation. Apache Pig. http://pig.apache.org. [15] H. Chang, M. Kodialam, R.R. Kompella, T.V. Lakshman, M. Lee,
and S. Mukherjee. Scheduling in MapReduce-like Systems for Fast Completion Time. InINFOCOM 2011.
[16] T. Condie, N. Conway, P. Alvaro, J.M. Hellerstein, K. Elmeleegy, and R. Sears. MapReduce Online. InNSDI 2010.
[17] J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. InOSDI 2004.
[18] J. Dejun, G. Pierre, and C. Chi. EC2 Performance Analysis for Resource Provisioning of Service-Oriented Applications. In
ICSOC/ServiceWave 2009.
[19] E. W. Dijkstra. A Note on Two Problems in Connection with Graphs.
Numerische Math, 11:1–269, 1959.
[20] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: Distributed data-parallel programs from sequential building blocks. InEuroSys 2007.
[21] S. Y. Ko, I. Hoque, B. Cho, and I. Gupta. Making Cloud Intermediate Data Fault-tolerant. InSOCC 2010.
[22] V. Kumar, H. Andrade, B. Gedik, and K.-L. Wu. DEDUCE: at the Intersection of MapReduce and Stream Processing. InEDBT, 2010. [23] W. Lloyd, M. J. Freedman, M. Kaminsky, and D. G. Andersen. Don’t Settle for Eventual: Scalable Causal Consistency for Wide-area Storage with COPS. InSOSP 2011.
[24] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. PigLatin: A Not-so-foreign Language for Data Processing. In
SIGMOD 2008.
[25] D. Ongaro, S.M. Rumble, R. Stutsman, J. Ousterhout, and M. Rosen-blum. Fast Crash Recovery in RAMCloud. InSOSP 2011. [26] V. Ramasubramanian, T.L. Rodeheffer, D.B. Terry, M.
Walraed-Sullivan, T. Wobber, C.C. Marshall, and A. Vahdat. Cimbiosys: a Platform for Content-based Partial Replication. InNSDI 2009. [27] Y. Sovran, R. Power, M.K. Aguilera, and J. Li. Transactional Storage
for Geo-replicated Systems. InSOSP 2011.
[28] H. Yang, A. Dasdan, R. Hsiao, and D. Parker. Map-Reduce-Merge: Simplified Relational Data Processing on Large Clusters. In
SIGMOD 2007.
[29] M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M.J. Franklin, S. Shenker, and I. Stoica. Resilient Distributed Datasets: a Fault-tolerant Abstraction for In-Memory Cluster Com-puting. InNSDI 2010.
[30] M. Zaharia, A. Konwinski, A. D. Joseph, R. H. Katz, and I. Sto-ica. Improving MapReduce Performance in Heterogeneous Environ-ments. InOSDI 2008.