S
A
L
S
A
Twister: A Runtime for Iterative
MapReduce
Jaliya Ekanayake
Community Grids Laboratory,
Digital Science Center
Pervasive Technology Institute
Indiana University
S
A
L
S
A
Acknowledgements to:
•
Co authors:
Hui Li, Binging Shang, Thilina Gunarathne
Seung-Hee Bae, Judy Qiu, Geoffrey Fox
School of Informatics and Computing
Indiana University Bloomington
S
A
L
S
A
Motivation
Data
Deluge
MapReduce
Runtimes (MPI)
Classic Parallel
Experiencing in
many domains
Data Centered, QoS
Efficient and
Proven techniques
Input
Output
map
Input
map
reduce
Input
map
reduce
iterations
Pij
Expand the Applicability of MapReduce to more
classes
of Applications
S
A
L
S
A
Features of Existing Architectures(1)
•
Programming Model
–
MapReduce (Optionally “
map-only
”)
–
Focus on
Single Step
MapReduce computations (DryadLINQ supports
more than one stage)
•
Input and Output Handling
–
Distributed data access (HDFS in Hadoop, Sector in Sphere, and shared
directories in Dryad)
–
Outputs normally goes to the distributed file systems
•
Intermediate data
–
Transferred via file systems (Local disk-> HTTP -> local disk in Hadoop)
–
Easy to support fault tolerance
–
Considerably high latencies
S
A
L
S
A
Features of Existing Architectures(2)
•
Scheduling
–
A
master
schedules tasks to
slaves
depending on the availability
–
Dynamic Scheduling
in Hadoop, static scheduling in Dryad/DryadLINQ
–
Naturally load balancing
•
Fault Tolerance
–
Data flows through
disks->channels->disks
–
A master keeps track of the data products
–
Re-execution of failed or slow tasks
S
A
L
S
A
A Programming Model for Iterative
MapReduce
•
Distributed data access
•
In-memory MapReduce
•
Distinction on static data
and variable data (
data
flow vs. δ flow
)
•
Cacheable
map/reduce
tasks (long running tasks)
•
Combine operation
•
Support fast intermediate
data transfers
Reduce (Key,
List<Value>)
Iterate
Map(Key, Value)
Combine
(
Map
<Key,Value>)
User
Program
Close()
Configure()
Static
data
δ flow
Twister Constraints for
Side Effect Free
map/reduce tasks
S
A
L
S
A
Twister
Programming Model
configureMaps(..)
Two configuration options :
1. Using local disks (only for maps)
2. Using pub-sub bus
configureReduce(..)
runMapReduce(..)
while(
condition
){
} //end while
updateCondition()
close()
User program’s process space
Combine()
operation
Reduce()
Map()
Worker Nodes
Communications/data transfers
via the pub-sub broker network
Iterations
May send <Key,Value> pairs directly
Local Disk
S
A
L
S
A
Twister API
1.
configureMaps
(PartitionFile partitionFile)
2.
configureMaps
(Value[] values)
3.
configureReduce
(Value[] values)
4.
runMapReduce
()
5.
runMapReduce
(KeyValue[] keyValues)
6.
runMapReduceBCast
(Value value)
7.
map
(MapOutputCollector collector, Key key, Value val)
8.
reduce
(ReduceOutputCollector collector, Key key,List<Value>
values)
S
A
L
S
A
Twister Architecture
Worker Node
Local Disk
Worker Pool
Twister Daemon
Master Node
Twister
Driver
Main Program
B
B
B
B
Pub/sub
Broker Network
Worker Node
Local Disk
Worker Pool
Twister Daemon
Scripts perform:
Data distribution, data collection,
and partition file creation
map
reduce
Cacheable tasks
S
A
L
S
A
Input/Output Handling
•
Data Manipulation Tool:
–
Provides basic functionality to manipulate data across the local
disks of the compute nodes
–
Data partitions are assumed to be files (Contrast to fixed sized
blocks in Hadoop)
–
Supported commands:
•
mkdir, rmdir, put,putall,get,ls,
•
Copy resources
•
Create Partition File
Node 0
Node 1
Node n
A common directory in local
disks of individual nodes
e.g. /tmp/twister_data
Data
Manipulation Tool
S
A
L
S
A
Partition File
•
Partition file allows duplicates
•
One data partition may reside in multiple nodes
•
In an event of failure, the duplicates are used to
re-schedule the tasks
File No
Node IP
Daemon No
File partition path
S
A
L
S
A
The use of pub/sub messaging
•
Intermediate data transferred via the broker network
•
Network of brokers used for load balancing
–
Different broker topologies
•
Interspersed computation and data transfer minimizes
large message load at the brokers
•
Currently supports
–
NaradaBrokering
–
ActiveMQ
100 map tasks, 10 workers in 10 nodes
Reduce()
map task queues
Map workers
Broker network
E.g.
~ 10 tasks are
S
A
L
S
A
Scheduling
•
Twister supports long running tasks
•
Avoids unnecessary initializations in each
iteration
•
Tasks are scheduled statically
–
Supports task reuse
–
May lead to inefficient resources utilization
•
Expect user to randomize data distributions to
minimize the processing skews due to any
S
A
L
S
A
Fault Tolerance
•
Recover at iteration boundaries
•
Does not handle individual task failures
•
Assumptions:
–
Broker network is reliable
–
Main program & Twister Driver has no failures
•
Any failures (hardware/daemons) result the
following fault handling sequence
–
Terminate currently running tasks (remove from
memory)
–
Poll for currently available worker nodes (& daemons)
–
Configure map/reduce using static data (re-assign data
S
A
L
S
A
Performance Evaluation
•
Hardware Configurations
•
We use the academic release of DryadLINQ, Apache Hadoop
version 0.20.2, and Twister for our performance comparisons.
•
Both Twister and Hadoop use JDK (64 bit) version 1.6.0_18, while
DryadLINQ and MPI uses Microsoft .NET version 3.5.
Cluster ID
Cluster-I
Cluster-II
# nodes
32
230
# CPUs in each node
6
2
# Cores in each CPU
8
4
Total CPU cores
768
1840
Supported OSs
Linux (Red Hat Enterprise Linux
Server release 5.4 -64 bit)
Windows (Windows Server 2008
-64 bit)
S
A
L
S
A
Pair wise Sequence Comparison using
Smith Waterman Gotoh
•
Typical MapReduce computation
•
Comparable efficiencies
S
A
L
S
A
Pagerank – An Iterative MapReduce Algorithm
•
Well-known pagerank algorithm [1]
•
Used ClueWeb09 [2] (1TB in size) from CMU
•
Reuse of map tasks and faster communication pays off
[1] Pagerank Algorithm,
http://en.wikipedia.org/wiki/PageRank
[2] ClueWeb09 Data Set,
http://boston.lti.cs.cmu.edu/Data/clueweb09/
M
R
Current
Page ranks
(Compressed)
Partial
Adjacency
Matrix
Partial
Updates
S
A
L
S
A
Multi-dimensional Scaling
•
Maps high dimensional data to lower dimensions (typically 2D or 3D)
•
SMACOF (Scaling by Majorizing of COmplicated Function)[1]
[1] J. de Leeuw, "Applications of convex analysis to multidimensional
scaling,"
Recent Developments in Statistics, pp. 133-145, 1977.
While(condition)
{
<X> = [A] [B] <C>
C = CalcStress(<X>)
}
While(condition)
{
<T> = MapReduce1([B],<C>)
<X> = MapReduce2([A],<T>)
C = MapReduce3(<X>)
S
A
L
S
A
Conclusions & Future Work
•
Twister extends the MapReduce to iterative algorithms
•
Several iterative algorithms we have implemented
–
K-Means Clustering
–
Pagerank
–
Matrix Multiplication
–
Multi dimensional scaling (MDS)
–
Breadth First Search
•
Integrating a distributed file system
S
A
L
S
A
Related Work
•
General MapReduce References:
–
Google MapReduce
–
Apache Hadoop
–
Microsoft DryadLINQ
–
Pregel
: Large-scale graph computing at Google
–
Sector/Sphere
–
All-Pairs
–
SAGA: MapReduce
S
A
L
S
A
S
A
L
S
A
Questions?
S
A
L
S
A
S
A
L
S
A
Hadoop (Google) Architecture
•
HDFS stores blocks, manages replications, handle failures
•
Map/reduce are Java processes, not long running
•
Failed maps are re-executed, failed reducers collect data from maps again
HDFS
M
Local
R
Task
Tracker
Job
Tracker
Map output goes to local disk firstTask
Tracker
Local
Map task reads Input data from HDFS
Task tracker notifies job tracker
Job tracker assigns some map outputs to a reducer Reducer downloads map outputs using HTTP Reduce output goes to HDFS
1
2
3
4
S
A
L
S
A
Twister Architecture
•
Scripts for file manipulations
•
Twister daemon is a process, but Map/Reduce tasks are Java
Threads (Hybrid approach)
M
Local
R
Twister
Daemon
Map output goes directly to reducerTwister
Daemon
Local
Reduce output goes to local disk OR to Combiner1
3
4
Read static data from local disk1
B
B
B
B
Broker Connection Receive static data (1)
OR
Variable data (key,value) via the brokers (2)
4
2
Broker NetworkTwister
Driver
Main program1. configureMaps(PartitionFile partitionFile)
2. configureMaps(Value[] values)
3. configureReduce(Value[] values)
4. String key=addToMemCache(Value value)
5. removeFromMemCache(String key)
6. runMapReduce()
7. runMapReduce(KeyValue[] keyValues)
8. runMapReduceBCast(Value value)
S
A
L
S
A
Twister
•
In-memory MapReduce
•
Distinction on static data
and variable data (
data flow
vs. δ flow
)
•
Cacheable
map/reduce
tasks
(long running tasks)
•
Combine operation
•
Support fast intermediate
data transfers
Different
S
A
L
S
A
Publications
1. Jaliya Ekanayake, (Advisor: Geoffrey Fox)Architecture and Performance of Runtime
Environments for Data Intensive Scalable Computing, Accepted for the Doctoral Showcase, SuperComputing2009.
2. Xiaohong Qiu, Jaliya Ekanayake, Scott Beason, Thilina Gunarathne, Geoffrey Fox, Roger Barga, Dennis Gannon,Cloud Technologies for Bioinformatics Applications, Accepted for publication in 2nd ACM Workshop on Many-Task Computing on Grids and Supercomputers,
SuperComputing2009.
3. Jaliya Ekanayake, Atilla Soner Balkir, Thilina Gunarathne, Geoffrey Fox, Christophe Poulain, Nelson Araujo, Roger Barga,DryadLINQ for Scientific Analyses, Accepted for publication in Fifth IEEE International Conference on e-Science (eScience2009), Oxford, UK.
4. Jaliya Ekanayake and Geoffrey Fox, High Performance Parallel Computing with Clouds and Cloud Technologies, First International Conference on Cloud Computing (CloudComp2009), Munich, Germany. – An extended version of this paper goes to a book chapter.
5. Geoffrey Fox, Seung-Hee Bae, Jaliya Ekanayake, Xiaohong Qiu, and Huapeng Yuan, Parallel Data Mining from Multicore to Cloudy Grids, High Performance Computing and Grids workshop, 2008. – An extended version of this paper goes to a book chapter.
6. Jaliya Ekanayake, Shrideep Pallickara, Geoffrey Fox, MapReduce for Data Intensive Scientific Analyses, Fourth IEEE International Conference on eScience, 2008, pp.277-284.
7. Jaliya Ekanayake, Shrideep Pallickara, and Geoffrey Fox, A collaborative framework for scientific data analysis and visualization, Collaborative Technologies and Systems(CTS08), 2008, pp. 339-346.