Hadoop is a widely used implementation of MapReduce. Hadoop supports fault tolerance in both MapRe- duce execution engine and HDFS. For each data block, HDFS maintains multiple replicas to improve data availability. Hadoop MapReduce uses speculative execution to minimize the impact of faulty, failed and over-loaded nodes.
paradigm, the performance may deteriorate. In section 2.3.2, we discussed why Hadoop does not perform well for iterative applications. Twister is one of the earliest frameworks designed specifically for iterative applications [57]. Twister adopts publish/subscribe messaging for communication and data transfer. Inter- mediate data across iterations are cached in memory and processed by long-running map tasks. Despite its performance advantage, Twister has some drawbacks. Firstly, Twister lacks the support of distributed file systems. Usually input data are stored in a shared file system (e.g. NFS). The user needs to manually par- tition the file and distribute data to the local storage of compute nodes. Twister has limited support of fault tolerance. In addition, Twister adopts static scheduling and the number of map tasks (equal to the number of partitions) must be the same as that of processor cores.
For complicated applications, to express all processing in a single job is not feasible and usually they are logically split into a set of nonindependent jobs. It is likely that an application comprises both regular and iterative MapReduce jobs. We propose Hybrid MapReduce (HyMR) which is a workflow management system combining the best of both regular MapReduce and iterative MapReduce.
7.2.1
System Design
Fig. 7.8 shows the architecture of our proposed HyMR built upon Hadoop and Twister. Workflows are expressed in Extensible Markup Language (XML) or Java properties format. The user can also provide runtime configuration files which specify how Hadoop and Twister should be configured. This benefits those workflows which require a different configuration than default to maximize performance. The progress of workflow execution is recorded in a XML file and accessible to users. HyMR has three main components: workflow instance controller, job controller and runtime controller.
Workflow instance controller manages workflow instances which are running copies of workflow defini- tions. Based on the user-provided workflow description file, a DAG is generated which encodes jobs and their dependency relationship. Workflow instance controller invokes theruntime controllerto starts runtimes, and controls the execution of jobs. If a job fails, the workflow instance controller will re-run it. If the maximal number of retries is reached, the workflow instance controller gives up and reports the failure. If a runtime fails (e.g. Hadoop daemon crashes), the workflow instance controller restarts the runtime and re-runs all failed jobs.
Job controller manages the execution of a single job. A job is automatically started when all its prerequisite jobs have completed. A job can be sequential or parallel. The job controller monitors job execution and can recover failed tasks. For Hadoop, fault tolerance is natively built in. For Twister, a fault detector keeps track of the state of Twister daemons. Current job is killed if any daemon failure is detected. Once the execution of a job completes or fails, the job controller notifies the workflow instance controller. Runtime controller manages Hadoop and Twister. After physical nodes are allocated through TORQUE
[18], the runtime controller deploys and starts Hadoop and Twister on them. They are shared by all jobs of a workflow, and termedpersistent runtimes. After a workflow instance completes, the runtime controller stops Hadoop and Twister. Compared with the approach that runtimes are started and stopped for each parallel job, persistent runtimes incur lower overhead.
User
Submit workflow
Job Controller Runtime Controller Workflow Instance Controller
Cluster Hadoop Twister
HyMR
Runtime
7.2.2
Workflows
We have implemented two HyMR workflows for our bioinformatics data visualization application, termed Twister-pipeline and Hybrid-pipeline shown in Fig. 7.9. After input data are received, they are split into two sub-sets: sample set and out-sample set. The sample set is firstly processed by a Pairwise Sequence Align- ment (PSA) algorithm whose output is the input of a Multidimensional Scaling (MDS) algorithm. MDS interpolation computes the mapping result of out-sample set by interpolating the mapping of sample set pro- duced by MDS. Some parallel algorithms have been implemented by us for both Hadoop and Twister. For Twister-pipeline, all parallel jobs run on Twister and data partitioning and staging need to be explicitly han- dled. For Hybrid-pipeline, iterative algorithms such as MDS is implemented in Twister while other parallel jobs are implemented in Hadoop; and data sharing is implicitly handled by HDFS. The detailed discussion of each application is presented below.
Input File Splitter Twister-PSA Twister-MDS
Twister-MI-MDS
sample set data
distribution out-sample set data distribution input output Result Merger (a) Twister-pipeline
Input File Splitter Hadoop-PSA Twister-MDS
Hadoop-MI-MDS
sample set data sharing
via HDFS out-sample set input output Result Merger data sharing via HDFS (b) Hybrid-pipeline Figure 7.9: HyMR workflows
7.2.2.1 Pairwise sequence alignment
Sequence alignment identifies similar regions among the sequences of DNA, RNA and protein that may be a consequence of functional, structural or evolutionary relationships. Pairwise sequence alignment does all-pair alignment over a set of sequences. Its result is naturally expressed as a all-pair dissimilarity matrix. Among
proposed PSA algorithms, Smith Waterman Gotoh (SWG) algorithm is used in our workflows and it has been implemented by us on both Hadoop and Twister.
7.2.2.2 Multidimensional Scaling (MDS)
MDS is often used in information visualization for exploring similarities in data. Given a pairwise dissim- ilarity matrix, an MDS algorithm assigns a location to each item inN-dimensional space in a way that the corresponding goal function is minimized. SMACOF is an optimization strategy used in MDS where m- dimensional data items are “mapped” to n-dimensional (n m) space with stress function minimized via iterative EM technique. Parallel SMACOF is used in our workflows. The output of PSA is the input of MDS. Because SMACOF is an iterative algorithm, we only implemented it on Twister.
7.2.2.3 MDS interpolation
The memory usage of SMACOF isO(N2), which tightly limits the practically achievable scale of parallel
MDS. Majorizing Interpolation Multidimensional Scaling (MI-MDS) has been proposed which significantly reduces the complexity [28]. The whole data set is split into two parts: sample set and out-sample set. The sample set is fed into PSA and MDS, and sample mapping result is generated. Based on the sample mapping result, MI-MDS can calculate the mapping result of out-sample set. MI-MDS can be easily parallelized because the interpolation of data points is independent. MI-MDS has been implemented on both Hadoop and Twister, called Hadoop-MI-MDS and Twister-MI-MDS. Hadoop-MI-MDS uses HDFS for data store and sharing, while Twister-MI-MDS partitions the data and stages it to compute nodes.
7.2.3
Summary
HyMR automates the management of runtime and jobs and facilitates the execution of workflows that con- sist of both MapReduce applications and iterative MapReduce applications. Currently Twister and Hadoop are supported and the best of both worlds is combined. Our visualization pipeline illustrates how HyMR can be used to run complicated workflows.
8
Related Work
There has been substantial research on various issues of distributed computing. We survey below the related work on parallel and distributed file systems, data staging, and task scheduling.