• No results found

General purpose Distributed Computing using a High level Language. Michael Isard

N/A
N/A
Protected

Academic year: 2021

Share "General purpose Distributed Computing using a High level Language. Michael Isard"

Copied!
63
0
0

Loading.... (view fulltext now)

Full text

(1)

Dryad and DryadLINQ

Dryad

 

and

 

DryadLINQ

General

purpose

 

Distributed

 

Computing

 

using

 

a

 

High

level

 

Language

 

Michael Isard

Microsoft Research Silicon Valley Microsoft Research Silicon Valley

(2)

Distributed Data

Parallel Computing

Distributed

 

Data Parallel

 

Computing

• Workloads beyond standard SQL HPCWorkloads beyond standard SQL, HPC

– Data‐mining, graph analysis, …

Complex long lived application software

– Complex, long‐lived application software

• Cloud (shared clusters)

– Transparent scaling

– Resource virtualization

• Commodity hardware

(3)

Talk overview

Talk

 

overview

• Part IPart I

– High‐level language: LINQ

Computational model: DAG

– Computational model: DAG

– Execution layer: Dryad+Quincy

P t II • Part II

– Dryad systems issues

– Comparison with MapReduce

(4)

LINQ

Microsoft’s Language INtegrated Query

Operators to manipulate datasets in NET

Operators to manipulate datasets in .NET

– Dataset is a first‐class abstraction

– Select, Join, GroupBy, Aggregate, etc.

Set at a time, instead of looping over Object at a time

• Integrated into .NET programming languages

– Programs can call operatorsPrograms can call operators

– Operators can invoke arbitrary .NET functions

Data model

l l d b

– Data elements are strongly typed .NET objects

– Much more expressive than SQL tables

Extensible

(5)

Aggregating partial sums

Aggregating

 

partial

 

sums

class PartialSum { public int sum; public int count; };

static double MergeSums(PartialSum[] sums) {

int totalSum 0 totalCount 0; int totalSum = 0, totalCount = 0; int i;

for (i = 0; i < sums.Length; ++i) {{

totalSum += sums[i].sum; totalCount += sums[i].count; }}

return (double) totalSum / (double) totalCount; }

(6)

Aggregating partial sums

Aggregating

 

partial

 

sums

class PartialSum { public int sum; public int count; };

static double MergeSums(PartialSum[] sums) {

return (double) sums Select(x > x sum) Sum() / return (double) sums.Select(x => x.sum).Sum() /

(double) sums.Select(x => x.count).Sum(); }

(7)

Convenient syntax

Convenient

 

syntax

var words =

( ( )) ( )

(8)

Convenient syntax

Convenient

 

syntax

var words =

( ( )) ( )

tableOfLines.SelectMany(l => l.Split(' ')).GroupBy(w => w); IQueryable<IGrouping<string,string>> words =

tableOfLines SelectMany(mySplitFunction) GroupBy(myStringIdentity); tableOfLines.SelectMany(mySplitFunction).GroupBy(myStringIdentity);

IEnumerable<string> mySplitFunction(string line) {{

return line.Split(' '); }

string myStringIdentity(string word) {

return word; }

(9)

K

means algorithm

K means

 

algorithm

(10)

K

means algorithm

K means

 

algorithm

(11)

K

means algorithm

K means

 

algorithm

(12)

K

means algorithm

K means

 

algorithm

(13)

K

means helper functions

K means

 

helper

 

functions

class Vector { … }

Vector Mean(IEnumerable<Vector> set) {

Vector sum = set Aggregate( (x y) => x + y ); Vector sum = set.Aggregate( (x, y) => x + y ); return sum / set.Count();

}

Vector NearestNeighbor(Vector vect, IEnumerable<Vector> set) { return set.Min( e => (e ‐ vect).L2Norm() );

(14)

K

means algorithm

K means

 

algorithm

IEnumerable<Vector> kMeansStep(IEnumerable<Vector> vectors, IEnumerable<Vector> centers) { var clusters = vectors.GroupBy(

vector => NearestNeighbor(vector centers) VectorId); vector => NearestNeighbor(vector, centers).VectorId); return clusters.Select(cluster => Mean(cluster));

}

IEnumerable<Vector> kMeans(IEnumerable<Vector> vectors, IEnumerable<Vector> centers) {

for (int i = 0; i < iterations; i++) centers = kMeansStep(vectors, centers); return centers;

(15)

Data mining, machine learning, …

Data

 

mining,

 

machine

 

learning,

 

• DecisionDecision tree‐tree training training • SVD

i i ( k)

Power iteration (PageRank)

Image feature extraction/indexing/clusteringNetwork trace analysis

LightLight fieldfield simulation simulation • …

(16)

Talk overview

Talk

 

overview

• Part IPart I

– High‐level language: LINQ

Computational model: DAG

– Computational model: DAG

– Execution layer: Dryad+Quincy

P t II

Part II

– Dryad systems issues

– Comparison with MapReduce

(17)

Computational model: DAG

Computational

 

model:

 

DAG

• Distributed processingDistributed processing

– Partition computation across cores/cluster

Minimize communication overhead

– Minimize communication overhead

• Directed‐acyclic graph

– Edge is finite sequence of data items

(18)

DAG abstraction

DAG

 

abstraction

• Explicit dataflowExplicit dataflow

– Exposes dependencies within computation

Absence of cyclesAbsence of cycles

– Allows re‐execution for fault‐tolerance

– Simplifies scheduling: no deadlock

• Cycles can often be replaced by unrolling

– Unsuitable for fine‐grain inner loops

Veryy p p popular

(19)

Map

Map

• Independent transformation of datasetIndependent transformation of dataset

– for each x in S, output x’ = f(x)

E g simple grep for word wE.g. simple grep for word w

(20)

Map

Map

• Independent transformation of datasetIndependent transformation of dataset

– for each x in S, output x’ = f(x)

E g simple grep for word wE.g. simple grep for word w

– output line x only if x contains w

(21)

Map

Map

• Independent transformation of datasetIndependent transformation of dataset

– for each x in S, output x’ = f(x)

E g simple grep for word wE.g. simple grep for word w

– output line x only if x contains w

S1 f S1

S f S’

S2 f S2

(22)

Reduce

Reduce

• Grouping plus aggregationGrouping plus aggregation

– 1) Group x in S according to key selector k(x) 2) For each group g output r(g)

– 2) For each group g, output r(g)

• E.g. simple word count

( )

– group by k(x) = x

(23)

Reduce

Reduce

• Grouping plus aggregationGrouping plus aggregation

– 1) Group x in S according to key selector k(x) 2) For each group g output r(g)

– 2) For each group g, output r(g)

• E.g. simple word count

( )

– group by k(x) = x

– for each group g output key (word) and count of g

(24)

Reduce

Reduce

(25)

Reduce

Reduce

G r S’ S1 S2 S33
(26)

Reduce

Reduce

S1 D G r S1’ S2’ G S2 D S3 D r S3’ G r 3

D is distribute e g by hash or range

3

(27)

Reduce

Reduce

S1 G S1’ G D r ir S2’ G S2 G D S3 G D r ir ir 3

ir is initial reduce e g compute a partial sum ir is initial reduce, e.g. compute a partial sum

(28)

K

means

K means

C0 ac cc P C1 P ac cc C2 ac cc C3
(29)

K

means

K means

C0 ac P1 ac ac cc C1 P2 P3 ac ac cc C2 3 ac ac ac ac ac cc C3
(30)

PageRank

PageRank

cc N0 1 ae ae ae N0 2 cc cc D D D N1 1 N1 2 N1 3 E1 N0 3 ae cc cc D N2 1 E2 E3 ae ae cc cc D D N2 2 N2 3 3 ae ae cc cc D D N3 1 N3 2 ae D cc N3 3
(31)

Distributed Word Count

Distributed

 

Word

 

Count

C t d f i t f d t

Count word frequency in a set of documents:

var words = docs.SelectMany(doc => doc.words);

var groups = words.GroupBy(word => word);

var counts = groups.Select(g => new WordCount(g.Key, g.Count()));

IN SM GB S OUT IN SM doc => doc.words GB word => word S g => new … OUT

(32)

Execution

 

Plan

 

for

 

Word

 

Count

SM SelectMany SM Q GB sort groupby pipelined (1) GB GB C groupby count p p S D MS distribute mergesort MS GB mergesort groupby pipelined Sum Sum
(33)

Execution

 

Plan

 

for

 

Word

 

Count

SM SM SM SM SM Q GB Q GB Q GB Q GB (1) GB GB C (2) GB C GB C GB C S D MS D MS D MS D MS GB MS GB MS GB
(34)

Talk overview

Talk

 

overview

• Part IPart I

– High‐level language: LINQ Computational model: DAG

– Computational model: DAG

– Execution layer: Dryad+Quincy

P t II

Part II

– Dryad systems issues

– Comparison with MapReduce

(35)

Dryad

Dryad

• GeneralGeneral purpose‐purpose execution engine execution engine

– Batch processing on immutable datasets

Well tested on large clusters – Well‐tested on large clusters

• Automatically handles

– Fault tolerance

– Distribution of code and intermediate data – Scheduling of work to resources

(36)

Fault tolerance

Fault

 

tolerance

• Buffer data in (some) edgesBuffer data in (some) edges

• Re‐execute on failure using buffered data

S l i l f l

(37)

Rewrite graph at runtime

Rewrite

 

graph

 

at

 

runtime

• Loop unrolling with convergence testsLoop unrolling with convergence tests • Adapt partitioning scheme at run time

Ch # titi b d ti d t l

– Choose #partitions based on runtime data volume

– Broadcast Join vs. Hash Join, etc.

Adaptive aggregation and distribution trees

– Based on data skew and network topology

• Load balancing

(38)

Dryad System Architecture

Dryad

 

System

 

Architecture

Scheduler

(39)

Dryad System Architecture

Dryad

 

System

 

Architecture

Scheduler

R

(40)

Quincy DAG Scheduler

Quincy

 

DAG

 

Scheduler

• Data locality and fairness (SLAs)Data locality and fairness (SLAs) • SOSP 2009

(41)

Production system

Production

 

system

• Dryad wellDryad well tested,‐tested scalable scalable

– Daily use supporting Bing for over 3 years Clusters with >10k computers

– Clusters with >10k computers

• Applicable to large number of computations

– 250 computer cluster at MSR SVC, Mar‐>Nov 09

• 15k jobs (tens of millions of processes executed) H d d f di ti t

• Hundreds of distinct programs

– Network trace analysis, privacy‐preserving inference, light‐

transport simulation, decision‐tree training, deep belief 

k f

(42)

Conclusion

Conclusion

• DryadLINQ supports many computationsDryadLINQ supports many computations

– Easy to use, flexible

DAG structured jobs scale to large clustersDAGstructured jobs scale to large clusters

– Transient failures common, disk failures daily

Publically available for download

(43)

Talk overview

Talk

 

overview

• Part IPart I

– High‐level language: LINQ Computational model: DAG

– Computational model: DAG

– Execution layer: Dryad+Quincy

P t II • Part II

– Dryad systems issues

– Comparison with MapReduce

(44)

Dryad Inputs and Outputs

Dryad

 

Inputs

 

and

 

Outputs

• Partitioned data setPartitioned data set

– Records do not cross partition boundaries

Data on compute machines: NTFS SQLServer

– Data on compute machines: NTFS, SQLServer, …

• Optional semantics

– Hash‐partition, range‐partition, sorted, etc.

• Loading external data

– Partitioning “automatic”

– File system chooses sensible partition sizes – Or known partitioning from user

(45)

Partitioning driven by data

Partitioning

 

driven

 

by

 

data

S1 G S2 G S3 G S5 G S4 G 5 S6 G

(46)

Partitioning driven by data

Partitioning

 

driven

 

by

 

data

S1 G ir D S2 S1’ G S3 G D r ir S2’ G S5 r S4 5 G ir D S6

(47)

Push vs Pull

Push

 

vs Pull

• Databases typically ‘pull’ using iterator modelDatabases typically  pull  using iterator model

– Avoids buffering

Can prevent unnecessary computation

– Can prevent unnecessary computation

• But DAG must be fully materialized

– Complicates rewriting

– Prevents resource virtualization in shared cluster

S1 D G r S1

S2’ G

(48)

Channel abstraction

Channel

 

abstraction

S1 G S1’ G D r ir S2’ G S2 G D S3 G D r ir ir 3
(49)

Push vs Pull

Push

 

vs Pull

• Channel types define connected componentChannel types define connected component

– Shared‐memory or TCP must be gang‐scheduled

Pull within gang push between gangsPull within gang, push between gangs

(50)

Fault tolerance

Fault

 

tolerance

• Buffer data in (some) edgesBuffer data in (some) edges

• Re‐execute on failure using buffered data

S l i l f l

Speculatively reexecute for stragglers‘Push’ model makes this very simple

(51)

DryadLINQ Internals

DryadLINQ

 

Internals

• Distributed execution plan

Static optimizations: pipelining eager aggregation etc

– Static optimizations: pipelining, eager aggregation, etc.

– Dynamic optimizations: data‐dependent partitioning, 

dynamic aggregation, etc.

• Automatic code generation

– Vertex code that runs on vertices

Ch l i li i d

– Channel serialization code

– Callback code for runtime optimizations

– Automatically distributed to cluster machinesAutomatically distributed to cluster machines

• Separate LINQ query from its local context

– Distribute referenced objectsj  to cluster machines

(52)

Decomposable Functions

Decomposable

 

Functions

• Roughly, a function H is decomposable if it can  b d f f be expressed as composition of two functions IR  and C such that  IR is commutative – IR is commutative

– C is commutative and associative

• Some decomposable functions

• Some decomposable functions

– Sum: IR = Sum, C = Sum

– Count: IR = Count C = SumCount: IR = Count, C = Sum

– OrderBy.Take: IR = OrderBy.Take, 

(53)

Two Key Questions

Two

 

Key

 

Questions

• How do we decompose a function?How do we decompose a function?

– Two interfaces: iterator and accumulator

Choice of interfaces can have significant impact – Choice of interfaces can have significant impact 

on performance

How do we deal with user defined functions?How do we deal with userdefined functions?

– Try to infer automatically

P id d t ti h i

(54)

Iterator Interface

 

in

 

DryadLINQ

y

[D bl ("I iti lR d " "C bi ")] M G1 M G1 M G1 [Decomposable("InitialReduce", "Combine")]

public static IntPair SumAndCount(IEnumerable<int> g) {

return new IntPair(g.Sum(), g.Count()); } G1 D G1 D G1 D IR IR IR }

public static IntPair InitialReduce(IEnumerable<int> g) {

ret rn ne IntPair(g S m() g Co nt())

MG MG

return new IntPair(g.Sum(), g.Count()); }

public static IntPair Combine(IEnumerable<IntPair> g) { G2 C G2 C {

return new IntPair(g.Select(x => x.first).Sum(), g.Select(x => x.second).Sum()); } MG G3 MG G3 } F X F X

(55)

Accumulator

 

Interface

 

in

 

DryadLINQ

[Decomposable("Initialize", "Iterate", "Merge")]

public static IntPair SumAndCount(IEnumerable<int> g) { M G1 M G1 M G1 g) {

return new IntPair(g.Sum(), g.Count()); }

public static IntPair Initialize() {

G1 D G1 D G1 D IR IR IR

public static IntPair Initialize() { return new IntPair(0, 0); }

public static IntPair Iterate(IntPair x int r) {

MG MG

public static IntPair Iterate(IntPair x, int r) { x.first += r; x.second += 1; return x; } G2 C G2 C }

public static IntPair Merge(IntPair x, IntPair o) { x.first += o.first; x second += o second; MG G3 MG G3 x.second += o.second; return x; } F X F X

(56)

Iterator PartialSort

• G1+IR and G2+C

– Keep only a fixed number of chunks in memoryKeep only a fixed number of chunks in memory

– Chunks are processed in parallel: sorted, grouped, 

reduced by IR or C, and emitted

• G3+F

– Read the entire input into memory, perform a parallel 

d l F h

sort, and apply F to each group

• Observations

G1+IR an al a s be pipelined ith pstream

– G1+IR can always be pipelined with upstream

– G3+F can often be pipelined with downstream

– G1+IR may have poor data reductionG1+IR may have poor data reduction

(57)

Accumulator

 

FullHash

• G1+IR, G2+C, and G3+F

Build an in memory parallel hash table: one accumulator

– Build an in‐memory parallel hash table: one accumulator 

object/key

– Each inputp  record is “accumulated” into its accumulator 

object, and then discarded

– Output the hash table when all records are processed

b

• Observations

– Optimal data reduction for G1+IR

M ti l t th b f i

– Memory usage proportional to the number of unique 

keys, not records

• So, we by default enable upstream and downstream pipelining

(58)

Talk overview

Talk

 

overview

• Part IPart I

– High‐level language: LINQ Computational model: DAG

– Computational model: DAG

– Execution layer: Dryad+Quincy

P t II

Part II

– Dryad systems issues

– Comparison with MapReduce

(59)

MapReduce (Hadoop)

MapReduce (Hadoop)

• MapReduce restrictsMapReduce restricts

– Topology of DAG

Semantics of function in compute vertex – Semantics of function in compute vertex

• Sequence of instances for non‐trivial tasks

G S1’ G D r ir S1 f 1 S2’ G G D G D r ir i S2 S f f G ir D S3 f

(60)

MapReduce language complexity

MapReduce language

 

complexity

• Simple to describe MapReduce modelSimple to describe MapReduce model

• Can be hard to map algorithm to framework

f k bi C P b d t C it t

– cf k‐means: combine C+P, broadcast C, iterate, …

(61)

MapReduce system complexity

MapReduce system

 

complexity

• Simple to describe MapReduce systemSimple to describe MapReduce system • Implementation not uniform

Diff t f lt t l f d

– Different fault‐tolerance for mappers, reducers

– Add more special cases for performance

H d i d i TCP h l i li

• Hadoop introducing TCP channels, pipelines, …

(62)

DryadLINQ demo

DryadLINQ demo

(63)

Conclusions

Conclusions

• HighHigh level‐level language is good language is good

– For ease of use, maintainability, expressiveness

Computational abstraction is importantComputational abstraction is important

– Suitable target for compiler, not developer

C h ld b ffi i

• Common patterns should be efficient

• Optimization should be easy

LINQ i tt d l b t ti

• LINQ is a pretty good language abstraction • DAG is a very good computational model

References

Related documents

The high resolution pulse width modulators is to be designed based on the on-chip Digital Clock Manager block present in the low- cost Spartan-3 FPGA series and on the I/O

General purpose GPU computing (GPGPU) is predicated on the idea that certain application workloads are better suited to high throughput archi- tectures, operate on large amount

students = load 'student.txt' as (name:chararray, age:int, gpa:double); centroided = foreach students generate gpa, find_centroid (gpa) as centroid;. grouped = group centroided

For instance, a common class of continuous- time recurrent artificial neural network can be shown to approximate any dynamical system to an arbitrary degree of accuracy (Funahashi

The initial simulation was performed using three different service levels - gold, silver and bronze which corresponded to urgent computing, prioritized scheduling and best-effort and

It achieves load balancing by first map-ping tasks to virtual machines and then virtual machines to host resources thereby improving the task response time, resource

These consultants would be primarily responsible for maintaining and enhancing these institutional level services, although they would be available to provide expert level support

The main design features include five major parts: the architecture, the user interface design, external interface, the database, process relation, and automation.. In order to