Twister
Bingjing Zhang, Fei Teng, Yuduo Zhou
Twister4Azure
Thilina Gunarathne
Building Virtual Cluster
Towards Reproducible eScience in the Cloud
Experimenting Lucene Index
on HBase in an HPC
Environment
Xiaoming Gao
Testing Hadoop / HDFS (CDH3u2) Multi-users with Kerberos on a Shared Environment
Stephen Wu
DryadLINQ CTP Evaluation
High-Performance Visualization Algorithms For Data-Intensive Analysis
Seung-Hee Bae and Jong Youl Choi
Million Sequence Challenge
Saliya Ekanayake, Adam Hughs, Yang Ruan
Cyberinfrastructure for Remote Sensing of Ice Sheets
Demos
Β§
Yang & Bingjing β Twister MDS + PlotViz +
Workflow (HPC)
Β§
Thilina β Twister for Azure (Cloud)
Β§
Jonathan β Building Virtual Cluster
Β§
Xiaoming β HBase-Lucene indexing
Β§
Seung-hee β Data Visualization
Computation and Communication
Pattern in Twister
Γ Broadcasting
q Data could be large
q Chain & MST
Γ Map Collectors
q Local merge
Γ Reduce Collectors
q Collect but no merge
Γ Combine
q Direct download or Gather
Map Tasks Map Tasks
Experiments
β’
Use Kmeans as example.
β’
Experiments are done on max 80 nodes and 2
switches.
β’
Some numbers from Google for reference
β
Send 2K Bytes over 1 Gbps network: 20,000 ns
β
We can roughly conclude β¦.
Broadcast 600MB Data with Max-Min Error Bar
13.61
15.86
17.28
19.62
Broadcasting 600 MB data in 50 times' average1
Broadcasting
Time
(Unit:
Seconds)
0 5 10 15 20 25
Execution Time Improvements
12675.41
3054.91 3190.17
Circle Fouettes (Direct Download) Fouettes (MST Gather)
Total Execution Time (Unit: Seconds) 0.00 2000.00 4000.00 6000.00 8000.00 10000.00 12000.00 14000.00
Kmeans, 600 MB centroids (150000 500D points), 640 data points, 80 nodes, 2 switches, MST Broadcasting, 50 iterations
Master Node Twister
Driver Twister-MDS
ActiveMQ
Broker MDS Monitor
PlotViz I. Send message to
start the job II. Send intermediate
results
Twister4Azure β Iterative
MapReduce
β’
Decentralized iterative MR architecture for clouds
β
Utilize highly available and scalable Cloud services
β’
Extends the MR programming model
β’
Multi-level data caching
β
Cache aware hybrid scheduling
β’
Multiple MR applications per job
β’
Collective communication primitives
β’
Outperforms Hadoop in local cluster by 2 to 4 times
β’
Sustain features of MRRoles4Azure
β dynamic scheduling, load balancing, fault tolerance, monitoring, local testing/debugging
Iterative MapReduce for Azure Cloud
Merge step
http://salsahpc.indiana.edu/twister4azure
Extensions to support broadcast data
Multi-level caching of static data Hybrid intermediate
data transfer
Cache-aware Hybrid Task
Scheduling
Collective Communication
Primitives
Weak Scaling Data Size Scaling
Performance adjusted for sequential performance difference
X: Calculate invV (BX)
Map Reduce Merge
BC: Calculate BX
Map Reduce Merge
Calculate Stress
Map Reduce Merge
New Iteration
Performance with/without
data caching Speedup gained using data cache
Scaling speedup Increasing number of iterations
Number of Executing Map Task Histogram
Strong Scaling with 128M Data Points
Weak Scaling Task Execution Time Histogram
First iteration performs the initial data fetch
Overhead between iterations
Performance Comparisons
BLAST Sequence SearchCap3 Sequence Assembly
Smith Watermann Sequence Alignment
MRRoles4Azure
Azure Cloud Services
β’ Highly-available and scalable
β’ Utilize eventually-consistent , high-latency cloud services effectively β’ Minimal maintenance and management overhead
Decentralized
β’ Avoids Single Point of Failure
β’ Global queue based dynamic scheduling β’ Dynamically scale up/down
MapReduce
MRRoles4Azure
Hybrid Task Scheduling
First iteration through queues
New iteration in Job Bulleting Board
Data in cache + Task meta data
Iterative MapReduce Collective Communication
Primitives
β’
Supports common higher-level communication patterns
β’
Framework can optimize these operations transparently to users
β’
Ease of use
β’
SumReduce
Faster twister based on
InfiniBand interconnect
Motivation
β’
InfiniBand successes in HPC community
β
More than 42% of Top500 clusters use InfiniBand
β
Extremely high throughput and low latency
β’
Up to 40Gb/s between servers and 1ΞΌsec latency
β
Reduce CPU utility up to 90%
β’
Cloud community can benefit from InfiniBand
β
Accelerated Hadoop (sc11)
β
HDFS benchmark tests
Motivation(Contβd)
β’
Bandwidth comparison of HDFS on various
Twister on InfiniBand
β’
Twister β Efficient iterative Mapreduce
runtime framework
β’
RDMA can make Twister faster
β
Accelerate static data distribution
β
Accelerate data shuffling between mappers and
reducers
Building Virtual Clusters
Towards Reproducible eScience in the Cloud
Jonathan Klinginsmith
[email protected]Separation of Concerns
27
Separation of concerns between two layers
β’ Infrastructure Layer β interactions with the Cloud API
β’ Software Layerβ interactions with the running VM
Equivalent machine images (MI) in separate clouds
Virtual Clusters
28
Running CloudBurst on Hadoop
29
Running CloudBurst on a 10 node Hadoop Cluster
β’ knife hadoop launch cloudburst 9
β’ echo β{"run list": "recipe[cloudburst]"}' > cloudburst.json
β’ chef-client -j cloudburst.json
Cluster Size (node count)
10 20 50
Run Time (seconds ) 0 50 100 150 200 250 300 350
400 CloudBurst Sample Data Run-Time Results CloudBurst FilterAlignments
Implementation - Condor Pool
30
Jerome Mitchell
Collaborators: University of Kansas, Indiana University, and Elizabeth City State University
Hidden Markov Method based Layer Finding
PolarGrid Data Browser:
Cloud GIS Distribution Service
β’
Google Earth example: 2009 Antarctica season
β’
Left image: overview of 2009 flight paths
Testing Environment:
GPU: Geforce GTX 580, 4096 MB, CUDA toolkit 4.0
Bridge Twister and HDFS
Twister + HDFS
HDFS
HDFS
User Client
Data Distribution
Compute Nodes Computation
Result Retrieval Semi-manually Data Copy
What we can gain from HDFS?
β’
Scalability
β’
Fault tolerance, especially in data distribution
β’
Simplicity in coding
β’
Potential for dynamic scheduling
β’
Maybe no need to move data between local FS and HDFS in future
β’
Upload data to HDFS
β A single file
β A directory
β’
List a directory on HDFS
β’
Download data from HDFS
β A single file
Maximizing Locality
Node 2 Node 3 Node 1
File 1 File 2 File 3
0, 149.165.229.1, 0, hdfs://pg1:9000/user/yuduo/File1 1, 149.165.229.2, 1, hdfs://pg1:9000/user/yuduo/File3 2, 149.165.229.3, 2, hdfs://pg1:9000/user/yuduo/File2
β’ Creating pseudo partition file using max-flow algorithm base on block distribution
β’ Compute nodes will fetch assigned data based on this file
β’ Maximal data locality is achieved
Performance
Data Distribution
Data size (G) 1 4 16
HDFS 20.3871 26.9711 257.374
ORI 12.8644 36.33 202.14
Data size (G)
1 4 16
Time (second) 0 50 100 150 200 250 300 HDFS-Twister Original-Twister
Performance
Loop Number
1 10 20 40
Time (Second) 0 2 4 6 8 10 12 14
HDFS-Twister 1G Data
Loop Time Overhead
Loop Number
1 10 20 40
Time (Second) 0 2 4 6 8 10 12 14
Original Twister 1G Data
1 10 20 40
0 5 10 15 20 25 30 35
HDFS-Twister 4G Data
1 10 20 40
0 5 10 15 20 25 30 35
Original Twister 4G Data
1 10 20 40
0 20 40 60 80 100 120 140
HDFS-Twister 16G Data
1 10 20 40
0 20 40 60 80 100 120 140
What we gain?
β’
Slightly longer execution time, if any
β’
Functions provided by HDFS
β
Fault tolerance
β
Various file operations
β
Scalability
β
Rack awareness, load balancer, etcβ¦
β’
Data can be used by Hadoop without any
Future Work
β’
HDFS operates on block level while Twister is on file level.
How to bridge this gap?
β’
Original Twister has 100% data locality. How can
Testing Hadoop / HDFS (CDH3u2)
Multi-users with Kerberos on a
Shared Environment
Motivation
β’
Supports multi-users simultaneously read/write
β
Original Hadoop simply lookup a plaintext permission
table
β
Usersβ data may be overwritten or be deleted by
others
β’
Provide a large Scientific Hadoop
β’
Encourage scientists upload and run their
application on Academic Virtual Clusters
β’
Hadoop 1.0 or CDH3 has a better integration with
Kerberos
What is Hadoop + Kerberos
β’
Network authentication protocol provides
strong authentication for client/server
applications
β’
Well-known in Single-Login System
β’
Integrates as a third party plugin to Hadoop
β’
Only βticketβ user can perform File I/Os and
HDFS Files I/O MapReduce Job Submission
Users Local (withinHadoop
Cluster) Remote (same/ diff host domain) Local(within Hadoop Cluster) Remote (same/diff host domain) hdfs/
(main/slave) Y Y Y Y
mapred/
(main/slave) Y Y Y Y
User w/o Kerberos
Deployment Progress
β’
Tested on Two nodes environment
β’
Plan to deploy on a real shared environemnt
(FutureGrid, Alamo or India)
β’
Works with System Admin to have a better
Kerberos setup (may integrate with LDAP)
Integrate Twister into Workflow
Sytems
Implementation approaches
β’
Enable Twister to use RDMA by spawning C
processes
β’
Directly use RMDA SDP (socket direct protocal)
β
Supported in latest Java 7, less efficient than C verbs
Mapper Java JVM
RDMA
client RDMAserver
Reducer Java JVM
Java JVM space
Further development
β’
Introduce ADIOS IO system to Twister
β
Achieve the best IO performance by using
different IO methods
β’
Integrate parallel file system with Twister by
using ADIOS
β
Take advantage of types of binary file formats,
such as HDF5, NetCDF and BP
β’
Goal - Cross the chasm between Cloud and
Integrate Twister with ISGA Analysis
Web Server
Chris Hemmerich, Adam Hughes, Yang Ruan, Aaron Buechlein, Judy Qiu, and Geoffrey Fox. Map-Reduce Expansion of the ISGA Genomic Analysis Web Server (2010) The 2nd IEEE International Conference on Cloud Computing Technology and Science
ISGA
Ergatis
TIGR Workflow
SGE Condor Other DCEsCloud,
<<XML>>
<<XML>>
Hybrid Sequence Clustering Pipeline
β’ The sample data is selected randomly from whole input fasta file dataset
β’ All critical components are formed by Twister and should able be
automatically done. Sample Data Out-Sample Data Sequence alignment Multidimensional Scaling Pairwise Clustering MDS Interpolation Sample Result Out-Sample Result
Hybrid Component Out-Sample Data Channel Sample Data
Pairwise Sequence Alignment
Input Sample Fasta Partition 1
Input Sample FastaPartition 2
β¦
Input Sample Fasta Partition n
M M M R R C Map Reduce Dissimilarity Matrix Partition 1
Dissimilarity Matrix Partition 2
β¦
Dissimilarity Matrix Partition n
β¦ β¦
Dissimilarity Matrix
Block
(0,0) Block(0,1) (0,n-1)Block
Block
(1,0) Block(1,1)
Block
(n-1, 0) (n-1, 1)Block (n-1,n-1)Block Block
(2,0) Block(2,2) Block (1,2) Block (2,1) Block (0,2) Block (1,n-1) Block (2,n-1) Block (0,0) Block (0,1) Block (0,3) β¦ Block (n-1,n-1)
β’ Left figure is the sample of target
dimension N*N dissimilarity matrix where the input is divided into n partitions
β’ The Sequence Alignment has two choices:
β’ Needleman-Wunsch
β’ Smith-Waterman
Multidimensional Scaling
Input Dissimilarity Matrix Partition 1 Input Dissimilarity Matrix Partition 2
β¦
Input Dissimilarity Matrix Partition n
M M
M
R C
Map Reduce
Sample Data File I/O Sample Label File I/O Network Communication
MDS interpolation
Input Sample Fasta Input Out-Sample
Fasta Partition 1 Input Out-Sample
Fasta Partition 2
β¦
Input Out-Sample Fasta Partition n
M M M Input Sample Coordinates R R C Map Reduce Final Output Input Sample Fasta Input Out-Sample
Fasta Partition 1 Input Out-Sample
Fasta Partition 2
β¦
Input Out-Sample Fasta Partition n
M M M Distance File Partition 1 Distance File Partition 2 β¦ Distance File Partition n Input Sample Coordinates M M M R R C Map Reduce Final Output
Sample Data File I/O Out-Sample Data File I/O Network Communication
β¦
β¦
β¦ β¦ β¦
Map
β’ The first method is for fast calculation, i.e use hierarchical/heuristic interpolation
Million Sequence Challenge
β’ Input DataSize: 680k
β’ Sample Data Size: 100k
β’ Out-Sample Data Size: 580k
β’ Test Environment: PolarGrid with 100 nodes, 800 workers.
Metagenomics and Protemics
Projects
β’
Protein Sequence Analysis -
In Progress
β Collaboration with Seattle Childrenβs Hospital
β’
Fungi Sequence Analysis -
Completed
β Collaboration with Prof. Haixu Tang in Indiana University
β Over 1 million sequences
β Results at http://salsahpc.indiana.edu/millionseq
β’
16S rRNA Sequence Analysis -
Completed
β Collaboration with Dr. Mina Rho in Indiana University
β Over 1 million sequences
Goal
β’
Identify Clusters
β Group sequences based on a
specified distance measure
β’
Visualize in 3-Dimension
β Map each sequence to a point in
3D while preserving distance between each pair of sequences
β’
Identify Centers
β Find one or several sequences to
represent the center of each cluster
Sequence Cluster
S1 Ca
S2 Cb
Architecture (Basic)
[1] Pairwise Alignment & Distance Calculation
β Smith-Waterman, Needleman-Wunsch and Blast
β Kimura 2, Jukes-Cantor, Percent-Identity, and BitScore
β MPI, Twister implementations [2] Pairwise Clustering
β Deterministic annealing
β MPI implementation
[3] Multi-dimensional Scaling
β Optimize Chisq, Scaling by MAjorizing a COmplicated Function (SMACOF)
β MPI, Twister implementations [4] Visualization
β PlotViz β a desktop point visualization application built by SALSA group
GTM MDS (SMACOF)
Maximize Log-Likelihood Minimize STRESS or SSTRESS
Objective Function
O(KN) (K << N) O(N2)
Complexity
β’ Non-linear dimension reduction
β’ Find an optimal configuration in a lower-dimension
β’ Iterative optimization method
Purpose
EM Iterative Majorization (EM-like)
Optimization Method
Vector-based data Non-vector (Pairwise similarity matrix)
β’
Full data processing by GTM or MDS is computing- and
memory-intensive
β’
Two step procedure
β
Training
: training by M samples out of N data
β
Interpolation
: remaining (N-M) out-of-samples are
approximated without training
n
In-sample N-n
Out-of-sample Total N data
MPI / MPI-IO
β’
Finding K clusters for N data points
β’ Relationship is a bipartite graph (bi-graph)
β’ Represented by K-by-N matrix (K << N)
β’
Decomposition for P-by-Q compute grid
β’ Reduce memory requirement by 1/PQ
K latent
points N datapoints
1 2 A B C 1 2
A B C
Parallel File System
Cray / Linux / Windows Cluster Parallel HDF5 ScaLAPACK
Parallel MDS
β’ O(N2) memory and computation
required.
β 100k data ο 480GB memory
β’ Balanced decomposition of NxN
matrices by P-by-Q grid.
β Reduce memory and computing requirement by 1/PQ
β’ Communicate via MPI primitives
MDS Interpolation
β’
Finding approximate
mapping position w.r.t.
k-NNβs prior mapping.
β’
Per point it requires:
β O(M) memory
β O(k) computation
β’
Pleasingly parallel
β’
Mapping 2M in 1450 sec.
β vs. 100k in 27000 sec.
β 7500 times faster than
estimation of the full MDS.
69
c1 c2 c3
r1
PubChem data with CTD
visualization by using MDS (left) and GTM (right)
About 930,000 chemical compounds are visualized as a point in 3D space, annotated by the related genes in Comparative Toxicogenomics Database (CTD)
Chemical compounds shown in literatures, visualized by MDS (left) and GTM (right)
Experimenting Lucene Index on
HBase in an HPC Environment
Introduction
β’
Background: data intensive computing requires storage
solutions for huge amounts of data
β’
One proposed solution: HBase, Hadoop implementation of
Introduction
β’
HBase architecture:
β’
Tables split into regions and served by region servers
β’
Reliable data storage and efficient access to TBs or PBs of
data, successful application in Facebook and Twitter
β’
Problem: no inherent mechanism for field value searching,
Our solution
β’
Get inverted index involved in HBase
β’
Store inverted indices in HBase tables
β’
Use the data set from a real digital library
application to demonstrate our solution:
bibliography data, image data, text data
Future work
β’
Experiments with a larger data set:
ClueWeb09 CatB data
β’
Distributed performance evaluation
β’
More data analysis or text mining based on
Parallel Fox Algorithm
Timing model for Fox algorithm
β’
problem model -> machine model->
performance
model
->measure parameters->show model fits with
data->compare with other runtime
β’
Simplify assumption:
β Tcomm = time to transfer one floating point word
β Tstartup = software latency for core primitive operations,
β’
Evaluation goals:
β f / c average number of flops per network transformation: the
Timing model for Fox LINQ to HPC on
TEMPEST
β’
Multiply M*M matrices on a
grid of nodes.
Size of sub-block is m*m, where
β’
Overhead:
β
To broadcast A sub-matrix:
β
To roll up B sub-matrix:
β
To compute A*B
β’
Total computation time:
ππ π‘πππ‘π’π + π2β(πππ+ πππππ)
Nβ1 βππ π‘πππ‘π’π + π2β(πππ + πππππ)
2βπ3βππππππ
π = πβ πβππ π‘πππ‘π’π + π2β πππ+ πππππ + 2βπ3βππππππ
π = 1 πβ
π‘πππΒ ππΒ 1Β ππππππ π ππ π‘πππΒ ππΒ πΒ ππππππ π ππ β
1
1 + 1 π
Measure network overhead and
runtime latency
Performance analysis Fox LINQ to HPC
on TEMPEST
Running time with 5x5,4x4, 3x3 nodes
with single core per node Running time with 4x4 nodes with24,16,8,1 core per node
1/e-1 vs. 1/Sqrt(n) showing linear