Dryad and DryadLINQ
Dryad
and
DryadLINQ
General
‐
purpose
Distributed
Computing
using
a
High
‐
level
Language
Michael Isard
Microsoft Research Silicon Valley Microsoft Research Silicon Valley
Distributed Data
‐
Parallel Computing
Distributed
Data Parallel
Computing
• Workloads beyond standard SQL HPCWorkloads beyond standard SQL, HPC
– Data‐mining, graph analysis, …
Complex long lived application software
– Complex, long‐lived application software
• Cloud (shared clusters)
– Transparent scaling
– Resource virtualization
• Commodity hardware
Talk overview
Talk
overview
• Part IPart I
– High‐level language: LINQ
Computational model: DAG
– Computational model: DAG
– Execution layer: Dryad+Quincy
P t II • Part II
– Dryad systems issues
– Comparison with MapReduce
LINQ
• Microsoft’s Language INtegrated Query
• Operators to manipulate datasets in NET
• Operators to manipulate datasets in .NET
– Dataset is a first‐class abstraction
– Select, Join, GroupBy, Aggregate, etc.
– Set at a time, instead of looping over Object at a time
• Integrated into .NET programming languages
– Programs can call operatorsPrograms can call operators
– Operators can invoke arbitrary .NET functions
• Data model
l l d b
– Data elements are strongly typed .NET objects
– Much more expressive than SQL tables
• Extensible
Aggregating partial sums
Aggregating
partial
sums
class PartialSum { public int sum; public int count; };static double MergeSums(PartialSum[] sums) {
int totalSum 0 totalCount 0; int totalSum = 0, totalCount = 0; int i;
for (i = 0; i < sums.Length; ++i) {{
totalSum += sums[i].sum; totalCount += sums[i].count; }}
return (double) totalSum / (double) totalCount; }
Aggregating partial sums
Aggregating
partial
sums
class PartialSum { public int sum; public int count; };static double MergeSums(PartialSum[] sums) {
return (double) sums Select(x > x sum) Sum() / return (double) sums.Select(x => x.sum).Sum() /
(double) sums.Select(x => x.count).Sum(); }
Convenient syntax
Convenient
syntax
var words =( ( )) ( )
Convenient syntax
Convenient
syntax
var words =( ( )) ( )
tableOfLines.SelectMany(l => l.Split(' ')).GroupBy(w => w); IQueryable<IGrouping<string,string>> words =
tableOfLines SelectMany(mySplitFunction) GroupBy(myStringIdentity); tableOfLines.SelectMany(mySplitFunction).GroupBy(myStringIdentity);
IEnumerable<string> mySplitFunction(string line) {{
return line.Split(' '); }
string myStringIdentity(string word) {
return word; }
K
‐
means algorithm
K means
algorithm
K
‐
means algorithm
K means
algorithm
K
‐
means algorithm
K means
algorithm
K
‐
means algorithm
K means
algorithm
K
‐
means helper functions
K means
helper
functions
class Vector { … }Vector Mean(IEnumerable<Vector> set) {
Vector sum = set Aggregate( (x y) => x + y ); Vector sum = set.Aggregate( (x, y) => x + y ); return sum / set.Count();
}
Vector NearestNeighbor(Vector vect, IEnumerable<Vector> set) { return set.Min( e => (e ‐ vect).L2Norm() );
K
‐
means algorithm
K means
algorithm
IEnumerable<Vector> kMeansStep(IEnumerable<Vector> vectors, IEnumerable<Vector> centers) { var clusters = vectors.GroupBy(
vector => NearestNeighbor(vector centers) VectorId); vector => NearestNeighbor(vector, centers).VectorId); return clusters.Select(cluster => Mean(cluster));
}
IEnumerable<Vector> kMeans(IEnumerable<Vector> vectors, IEnumerable<Vector> centers) {
for (int i = 0; i < iterations; i++) centers = kMeansStep(vectors, centers); return centers;
Data mining, machine learning, …
Data
mining,
machine
learning,
…
• DecisionDecision tree‐tree training training • SVD
i i ( k)
• Power iteration (PageRank)
• Image feature extraction/indexing/clustering • Network trace analysis
• LightLight field‐field simulation simulation • …
Talk overview
Talk
overview
• Part IPart I
– High‐level language: LINQ
Computational model: DAG
– Computational model: DAG
– Execution layer: Dryad+Quincy
P t II
• Part II
– Dryad systems issues
– Comparison with MapReduce
Computational model: DAG
Computational
model:
DAG
• Distributed processingDistributed processing
– Partition computation across cores/cluster
Minimize communication overhead
– Minimize communication overhead
• Directed‐acyclic graph
– Edge is finite sequence of data items
DAG abstraction
DAG
abstraction
• Explicit dataflowExplicit dataflow
– Exposes dependencies within computation
• Absence of cycles • Absence of cycles
– Allows re‐execution for fault‐tolerance
– Simplifies scheduling: no deadlock
• Cycles can often be replaced by unrolling
– Unsuitable for fine‐grain inner loops
• Veryy p p popular
Map
Map
• Independent transformation of datasetIndependent transformation of dataset
– for each x in S, output x’ = f(x)
• E g simple grep for word w • E.g. simple grep for word w
Map
Map
• Independent transformation of datasetIndependent transformation of dataset
– for each x in S, output x’ = f(x)
• E g simple grep for word w • E.g. simple grep for word w
– output line x only if x contains w
Map
Map
• Independent transformation of datasetIndependent transformation of dataset
– for each x in S, output x’ = f(x)
• E g simple grep for word w • E.g. simple grep for word w
– output line x only if x contains w
S1 f S1’
S f S’
S2 f S2
Reduce
Reduce
• Grouping plus aggregationGrouping plus aggregation
– 1) Group x in S according to key selector k(x) 2) For each group g output r(g)
– 2) For each group g, output r(g)
• E.g. simple word count
( )
– group by k(x) = x
Reduce
Reduce
• Grouping plus aggregationGrouping plus aggregation
– 1) Group x in S according to key selector k(x) 2) For each group g output r(g)
– 2) For each group g, output r(g)
• E.g. simple word count
( )
– group by k(x) = x
– for each group g output key (word) and count of g
Reduce
Reduce
Reduce
Reduce
G r S’ S1 S2 S33Reduce
Reduce
S1 D G r S1’ S2’ G S2 D S3 D r S3’ G r 3D is distribute e g by hash or range
3
Reduce
Reduce
S1 G S1’ G D r ir S2’ G S2 G D S3 G D r ir ir 3ir is initial reduce e g compute a partial sum ir is initial reduce, e.g. compute a partial sum
K
‐
means
K means
C0 ac cc P C1 P ac cc C2 ac cc C3K
‐
means
K means
C0 ac P1 ac ac cc C1 P2 P3 ac ac cc C2 3 ac ac ac ac ac cc C3PageRank
PageRank
cc N0 1 ae ae ae N0 2 cc cc D D D N1 1 N1 2 N1 3 E1 N0 3 ae cc cc D N2 1 E2 E3 ae ae cc cc D D N2 2 N2 3 3 ae ae cc cc D D N3 1 N3 2 ae D cc N3 3Distributed Word Count
Distributed
Word
Count
C t d f i t f d t
Count word frequency in a set of documents:
var words = docs.SelectMany(doc => doc.words);
var groups = words.GroupBy(word => word);
var counts = groups.Select(g => new WordCount(g.Key, g.Count()));
IN SM GB S OUT IN SM doc => doc.words GB word => word S g => new … OUT
Execution
Plan
for
Word
Count
SM SelectMany SM Q GB sort groupby pipelined (1) GB GB C groupby count p p S D MS distribute mergesort MS GB mergesort groupby pipelined Sum SumExecution
Plan
for
Word
Count
SM SM SM SM SM Q GB Q GB Q GB Q GB (1) GB GB C (2) GB C GB C GB C S D MS D MS D MS D MS GB MS GB MS GBTalk overview
Talk
overview
• Part IPart I
– High‐level language: LINQ Computational model: DAG
– Computational model: DAG
– Execution layer: Dryad+Quincy
P t II
• Part II
– Dryad systems issues
– Comparison with MapReduce
Dryad
Dryad
• GeneralGeneral purpose‐purpose execution engine execution engine
– Batch processing on immutable datasets
Well tested on large clusters – Well‐tested on large clusters
• Automatically handles
– Fault tolerance
– Distribution of code and intermediate data – Scheduling of work to resources
Fault tolerance
Fault
tolerance
• Buffer data in (some) edgesBuffer data in (some) edges
• Re‐execute on failure using buffered data
S l i l f l
Rewrite graph at runtime
Rewrite
graph
at
runtime
• Loop unrolling with convergence testsLoop unrolling with convergence tests • Adapt partitioning scheme at run time
Ch # titi b d ti d t l
– Choose #partitions based on runtime data volume
– Broadcast Join vs. Hash Join, etc.
• Adaptive aggregation and distribution trees
– Based on data skew and network topology
• Load balancing
Dryad System Architecture
Dryad
System
Architecture
Scheduler
Dryad System Architecture
Dryad
System
Architecture
Scheduler
R
Quincy DAG Scheduler
Quincy
DAG
Scheduler
• Data locality and fairness (SLAs)Data locality and fairness (SLAs) • SOSP 2009
Production system
Production
system
• Dryad wellDryad well tested,‐tested scalable scalable
– Daily use supporting Bing for over 3 years Clusters with >10k computers
– Clusters with >10k computers
• Applicable to large number of computations
– 250 computer cluster at MSR SVC, Mar‐>Nov 09
• 15k jobs (tens of millions of processes executed) H d d f di ti t
• Hundreds of distinct programs
– Network trace analysis, privacy‐preserving inference, light‐
transport simulation, decision‐tree training, deep belief
k f
Conclusion
Conclusion
• DryadLINQ supports many computationsDryadLINQ supports many computations
– Easy to use, flexible
• DAG structured jobs scale to large clusters • DAG‐structured jobs scale to large clusters
– Transient failures common, disk failures daily
• Publically available for download
Talk overview
Talk
overview
• Part IPart I
– High‐level language: LINQ Computational model: DAG
– Computational model: DAG
– Execution layer: Dryad+Quincy
P t II • Part II
– Dryad systems issues
– Comparison with MapReduce
Dryad Inputs and Outputs
Dryad
Inputs
and
Outputs
• Partitioned data setPartitioned data set
– Records do not cross partition boundaries
Data on compute machines: NTFS SQLServer
– Data on compute machines: NTFS, SQLServer, …
• Optional semantics
– Hash‐partition, range‐partition, sorted, etc.
• Loading external data
– Partitioning “automatic”
– File system chooses sensible partition sizes – Or known partitioning from user
Partitioning driven by data
Partitioning
driven
by
data
S1 G S2 G S3 G S5 G S4 G 5 S6 G
Partitioning driven by data
Partitioning
driven
by
data
S1 G ir D S2 S1’ G S3 G D r ir S2’ G S5 r S4 5 G ir D S6
Push vs Pull
Push
vs Pull
• Databases typically ‘pull’ using iterator modelDatabases typically pull using iterator model
– Avoids buffering
Can prevent unnecessary computation
– Can prevent unnecessary computation
• But DAG must be fully materialized
– Complicates rewriting
– Prevents resource virtualization in shared cluster
S1 D G r S1’
S2’ G
Channel abstraction
Channel
abstraction
S1 G S1’ G D r ir S2’ G S2 G D S3 G D r ir ir 3Push vs Pull
Push
vs Pull
• Channel types define connected componentChannel types define connected component
– Shared‐memory or TCP must be gang‐scheduled
• Pull within gang push between gangs • Pull within gang, push between gangs
Fault tolerance
Fault
tolerance
• Buffer data in (some) edgesBuffer data in (some) edges
• Re‐execute on failure using buffered data
S l i l f l
• Speculatively re‐execute for stragglers • ‘Push’ model makes this very simple
DryadLINQ Internals
DryadLINQ
Internals
• Distributed execution plan
Static optimizations: pipelining eager aggregation etc
– Static optimizations: pipelining, eager aggregation, etc.
– Dynamic optimizations: data‐dependent partitioning,
dynamic aggregation, etc.
• Automatic code generation
– Vertex code that runs on vertices
Ch l i li i d
– Channel serialization code
– Callback code for runtime optimizations
– Automatically distributed to cluster machinesAutomatically distributed to cluster machines
• Separate LINQ query from its local context
– Distribute referenced objectsj to cluster machines
Decomposable Functions
Decomposable
Functions
• Roughly, a function H is decomposable if it can b d f f be expressed as composition of two functions IR and C such that IR is commutative – IR is commutative– C is commutative and associative
• Some decomposable functions
• Some decomposable functions
– Sum: IR = Sum, C = Sum
– Count: IR = Count C = SumCount: IR = Count, C = Sum
– OrderBy.Take: IR = OrderBy.Take,
Two Key Questions
Two
Key
Questions
• How do we decompose a function?How do we decompose a function?
– Two interfaces: iterator and accumulator
Choice of interfaces can have significant impact – Choice of interfaces can have significant impact
on performance
• How do we deal with user defined functions? • How do we deal with user‐defined functions?
– Try to infer automatically
P id d t ti h i
Iterator Interface
in
DryadLINQ
y
[D bl ("I iti lR d " "C bi ")] M G1 M G1 M G1 [Decomposable("InitialReduce", "Combine")]public static IntPair SumAndCount(IEnumerable<int> g) {
return new IntPair(g.Sum(), g.Count()); } G1 D G1 D G1 D IR IR IR }
public static IntPair InitialReduce(IEnumerable<int> g) {
ret rn ne IntPair(g S m() g Co nt())
MG MG
return new IntPair(g.Sum(), g.Count()); }
public static IntPair Combine(IEnumerable<IntPair> g) { G2 C G2 C {
return new IntPair(g.Select(x => x.first).Sum(), g.Select(x => x.second).Sum()); } MG G3 MG G3 } F X F X
Accumulator
Interface
in
DryadLINQ
[Decomposable("Initialize", "Iterate", "Merge")]
public static IntPair SumAndCount(IEnumerable<int> g) { M G1 M G1 M G1 g) {
return new IntPair(g.Sum(), g.Count()); }
public static IntPair Initialize() {
G1 D G1 D G1 D IR IR IR
public static IntPair Initialize() { return new IntPair(0, 0); }
public static IntPair Iterate(IntPair x int r) {
MG MG
public static IntPair Iterate(IntPair x, int r) { x.first += r; x.second += 1; return x; } G2 C G2 C }
public static IntPair Merge(IntPair x, IntPair o) { x.first += o.first; x second += o second; MG G3 MG G3 x.second += o.second; return x; } F X F X
Iterator PartialSort
• G1+IR and G2+C
– Keep only a fixed number of chunks in memoryKeep only a fixed number of chunks in memory
– Chunks are processed in parallel: sorted, grouped,
reduced by IR or C, and emitted
• G3+F
– Read the entire input into memory, perform a parallel
d l F h
sort, and apply F to each group
• Observations
G1+IR an al a s be pipelined ith pstream
– G1+IR can always be pipelined with upstream
– G3+F can often be pipelined with downstream
– G1+IR may have poor data reductionG1+IR may have poor data reduction
Accumulator
FullHash
• G1+IR, G2+C, and G3+F
Build an in memory parallel hash table: one accumulator
– Build an in‐memory parallel hash table: one accumulator
object/key
– Each inputp record is “accumulated” into its accumulator
object, and then discarded
– Output the hash table when all records are processed
b
• Observations
– Optimal data reduction for G1+IR
M ti l t th b f i
– Memory usage proportional to the number of unique
keys, not records
• So, we by default enable upstream and downstream pipelining
Talk overview
Talk
overview
• Part IPart I
– High‐level language: LINQ Computational model: DAG
– Computational model: DAG
– Execution layer: Dryad+Quincy
P t II
• Part II
– Dryad systems issues
– Comparison with MapReduce
MapReduce (Hadoop)
MapReduce (Hadoop)
• MapReduce restrictsMapReduce restricts
– Topology of DAG
Semantics of function in compute vertex – Semantics of function in compute vertex
• Sequence of instances for non‐trivial tasks
G S1’ G D r ir S1 f 1 S2’ G G D G D r ir i S2 S f f G ir D S3 f
MapReduce language complexity
MapReduce language
complexity
• Simple to describe MapReduce modelSimple to describe MapReduce model
• Can be hard to map algorithm to framework
f k bi C P b d t C it t
– cf k‐means: combine C+P, broadcast C, iterate, …
MapReduce system complexity
MapReduce system
complexity
• Simple to describe MapReduce systemSimple to describe MapReduce system • Implementation not uniform
Diff t f lt t l f d
– Different fault‐tolerance for mappers, reducers
– Add more special cases for performance
H d i d i TCP h l i li
• Hadoop introducing TCP channels, pipelines, …
DryadLINQ demo
DryadLINQ demo
Conclusions
Conclusions
• HighHigh level‐level language is good language is good
– For ease of use, maintainability, expressiveness
• Computational abstraction is important • Computational abstraction is important
– Suitable target for compiler, not developer
C h ld b ffi i
• Common patterns should be efficient
• Optimization should be easy
LINQ i tt d l b t ti
• LINQ is a pretty good language abstraction • DAG is a very good computational model