T H E S I S P R E S E N T A T I O N B Y S U R Y A M I T A H A R I N D R A R I S E P T E M B E R 5T H, 2 0 1 4 T H E S I S A D V I S O R : A S T E R I O S K A T S I F O D I M O S , P H D T H E S I S S U P E R V I S O R : P R O F . D R . V O L K E R M A R K L D A T A B A S E & I N F O R M A T I O N M A N A G E M E N T ( D I M A ) T E C H N I S C H E U N I V E R S I T Ä T B E R L I N
1
Workflow Management System for
Stratosphere
Agenda
2
Background
¡
Workflow & Workflow Management System
¡
Control Flow vs Data Flow
¡
Related Work
Motivation
Approach
Stage 1: Translating AST to Control Flow Graph
¡
Abstract Syntax Tree (AST)
¡
Control Flow Graph
Stage 2: Adding Data Flow to the Control Flow Graph
¡
Data Flow Analysis
Stage 3: Generate Code for Underlying System
Evaluation: Productivity & Generality
Conclusion
Workflows & Workflow Management System
3
Big Data Analytics
à
Complex applications to process large datasets on distributed resources
Workflow:
Automate procedures that otherwise needed to be carried out manually
[Deelman et al, 2009]
Sequence of steps or computation [Crobak, 2012]
Workflow Management System (WMS):
Defines, manages and executes workflows
Order of execution is driven by a computer representation of the workflow
logic [Hollingsworth et al, 1993]
Simple Workflow vs Complex Workflow
4
ETL Process
Workflow
[Crobak, 2012]
Promoter Identification
Taxonomy of a Workflow
5
Data Flow vs Control Flow
6
Data Flow
¡
Related Work on Data Flow Systems: Hadoop MR, Stratosphere, Pig, Hive, Jet
¡
Limitations:
÷
Does not support control structures
÷
Low level optimized code
à
reduce productivity
÷
High overhead in learning new language i.e. Pig Latin
Control Flow
¡
Related Work on Workflow Systems: Oozie, Luigi, Azkaban, Kepler, Spark
¡
Limitations:
÷
Markup languages
à
cumbersome
÷
Graphical representation
à
limited
Motivation
7
Problem
¡
Stratosphere
à
does not support control flow outside UDFs
¡
Existing workflow systems
à
dependencies specified manually
Solution
¡
WMS that automatically detects the control flow and data
dependencies between tasks from pure program code
¡
Intuitive way for the programmer to define the workflow
Goals
¡
Design and develop a WMS that works on top of Stratosphere
¡
Define a workflow domain specific language (DSL) to make defining
Workflow Design: Our Taxonomy
8
Approach
9
Translate the program code into target
code:
¡
Translate user program to Intermediate
Representation (IR) Control Flow Graph
(CFG)
¡
Add data flow to the CFG
¡
Generate code for underlying system
Stage 1 Part 1: Translate User Program to AST
10
Compiler constructs a sequence of Intermediate Representations (IR) which
can have a variety of forms
Abstract Syntax Trees (AST)
à
data structure that represents program
constructs.
¡
Each node in AST represents operator
Grammar Definition & AST Representation
11
Our Tool: Scala AST
12
Reuse the Scala AST given freely by the Scala compiler
Scala Macros
¡
Compile time metaprogramming
¡
Expand trees at compile time enabling programmers to hack and
manipulate AST within compilation scope
Scala AST Classes [Stocker, 2010]
¡
Block
– List of statements and return value of expression
¡
ValDef
– Immutable and mutable variable or statements
¡
Assign
– non-initial assignments to variables
¡
If
– consists of cond, thenp, and elsep sub-tree
Generating AST from User Program
13
Sample program in our workflow DSL
val e1 = DataSource(”..")
val e2 = DataSource(”..")
var e3: DataSet[(String, Int, Int)] = null
var i = 0
while(i < 0) {
if (e1.map(x => x._2)… > 50)
e3 = e1.map { x => (x._1, x._2 + 1000, x._3)}
else
e3 = e2.map { x => (x._1, x._2 + 1500, x._3)}
i = i + 1
}
val e4 = e3.write(”…”)
Stage 1 Part 2: Generate Control Flow Graph from AST
14
Control Flow Graph
¡
Directed graph in which the nodes represent basic blocks and the edges
represent control flow paths [Allen, 1970]
¡
Basic Blocks
à
sequences of instructions or statements that are always
executed together
¡
Edges represent possible flow of control from the end of one basic block to
CFG for Various Statements
Generated CFG from AST
Generated CFG from AST Algorithm (1 of 2)
Create CFG from AST Algorithm (2 of 2)
Stage 2: Generate CF-Enriched Data Flow
19
Data Flow Analysis [Lam et al, 2006]
¡
Transmission of information through program variables missing in CFG
¡
Derive the information about the flow of data along with program execution
paths
¡
Traverse the CFG to detect data dependencies
¡
Add another type of edges which presents information on the data
Generate Def-Use Pair
20
Compute the set of variables defined def
Band the set of variables
used in each block of the CFG use
B
Association between the block and variable of the program:
¡
def(B,v) holds, for a variable v and a vertex B, if B defines v
¡
use(B,v) holds, for a variable v and a vertex B, if B uses the value of v
Generate the Def-Use pair information for each of the block in
G(V,E)
Add an edge from block B1 to block B2 that depicts the data flow of
variable v given that def
(B1,v)reaches use
(B2,v)CFG with Def-Use Pair
21
val e1 = DataSource(”..")
val e2 = DataSource(”..")
var e3: DataSet[(String, Int, Int)] = null
var i = 0
while(i < 0) {
if (e1.map(x => x._2)… > 50)
e3 = e1.map { x => (x._1, x._2 + 1000,
x._3)}
else
e3 = e2.map { x => (x._1, x._2 + 1500,
x._3)}
i = i + 1
}
val e4 = e3.write(”…”)
Adding Data Flow to the CFG
22
Control-Flow-Enriched Data Flow
23
1
2
3
4
5
6
7
Stage 3: Generate Code for Underlying System
24
Assumptions
¡
Code generated will run only for systems with a specified set of primitives
that are currently supported by Stratosphere
Transform each block in G(V,E,DFE) to a Stratosphere job
Output: Stratosphere jobs to be executed in the WMS with order according to
the dependencies defined in the IR
Code Generation Algorithm (1 of 2)
25
Each incoming DFE to a block
à
Stratosphere job of that block requires
the input of the data or variable
contained in the DFE
Each outgoing DFE from a block
à
Stratosphere job of that block need to
output the variable contained in the
DFE
WMS automatically selects which job
to be run
Code Generation Algorithm (2 of 2)
26
J
à
sequence of
Stratosphere job j(I,O)
I
à
data source set of all
input variables to the job
O
à
data sink set of all
output variables from the
job
Evaluation: Productivity
27
Oozie vs Workflow DSL Implementation (1 of 2)
28
Oozie Implementation
¡
Specify two XML definitions, for
the main process and the
subprocess.
¡
Each XML definition contains
the action nodes and decision
nodes based on the overall
workflow
¡
The input and output directory
of each subprocess is also
defined manually in the XML
definition.
A part of Oozie Implementation of SubDirectory Subprocess [Source: http://www.infoq.com/articles/oozieexample ]
Oozie vs Workflow DSL Implementation (2 of 2)
29
Workflow DSL Implementation
¡
Specify one workflow definition
for both the main process and
sub- process
¡
Intuitive
à
Ex: the fork node in
the main process can be
replaced by a general while style
iteration
¡
Body of the iteration is the
sub-process itself
à
the
conditionals branching based
on the directory information
var temp = new Directories()
var dirList = temp.get
var i: Int = 0
while (i < temp.getSize) {
var dir = new DirInfo(dirList(i))
var dirAge = dir.getAge
var dirSize = dir.getSize
if( if(dirAge < 1) dirSize > 23 else dirSize > 0) {
if(dirAge > 6 || dirSize > 23)
{
var ingest = ingestFile(dir.getName)
var archive = archiveFile(dir.getName)
} else
{
var reminder = sendReminder(dir.getName)
}
}
i = i+1
}
Evaluation: Generality
30
High-level declarative interface which adheres only for
Stratosphere at the moment
Deeply embedded in Scala - same syntax and semantics with
some restrictions
Possible to compile a program written in our DSL to other
underlying platforms i.e. Spark can understand the general-style
if statement and while statement supported by our DSL
Logistic Regression in Spark & Workflow DSL
31
Spark
Our workflow DSL
val data = spark.hdfsTextFile(...).map(readPoint).cache() var w = Vector.random(D)
for (i <- 1 to ITERATIONS) {
var gradient = spark.accumulator(Vector.zeros(D)) data.foreach(p => {
val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y gradient += scale * p.x
})
w -= gradient.value }
println("Final w: " + w)
val data = spark.hdfsTextFile(...).map(readPoint).cache() var w = Vector.random(D)
while(i < ITERATIONS) { w -= data.map(p =>
{
val scale = (1/(1+exp(-p.y*(w dot p.x))) - 1) * p.y scale * p.x }).reduce(_+_) i = i + 1 } println("Final w: " + w) Source: http://laser.inf.ethz.ch/2013/material/joseph/LASER-Joseph-6.pdf
Conclusion
32
Define a workflow DSL to enable the programmer to implement their
algorithm
Deeply embedded in Scala
à
avoids overhead for the programmer
Generate a control-flow-enriched data flow and target code from user
program via static analysis of the program code
Static analysis of Scala code detects the control flow and data dependencies
Increase productivity compared to the implementation in other existing WMS
(Oozie)
Future Work
33
Extend grammar of our DSL i.e. For-comprehension
Extend our DSL to other frameworks
¡
Possible to generate the code or job scripts of the workflow for any
execution framework
References
34
[Deelman et al, 2009] Ewa Deelman, Dennis Gannon, Matthew Shields, and Ian Taylor. Workflows and
e-science: An overview of workflow system features and capabilities. Future Generation Computer Systems, 25(5): 528–540, 2009.
[Hollingsworth et al, 1993] David Hollingsworth and UK Hampshire. Workflow management coalition the
workflow reference model. Workflow Management Coalition, 68, 1993.
[Ludäscher et al, 2005] Ludäscher Bertram, Ilkay Altintas, Chard Berkley, Dan Higgins, Efrat Jaeger, Matthew
Jones, Edward A. Lee, Jing Tao, and Yang Zhao. Scientific Workflow Management and the Kepler System. Concurrency and Computation: Practice and Experience 18 no. 10, 1039-1065, 2006.
[Yu et al, 2005] Jia Yu and Rajkumar Buyya. A taxonomy of workflow management systems for grid computing.
Journal of Grid Computing, 3(3-4):171–200, 2005.
[Stocker, 2010] Mirko Stocker. Scala Refactoring. PhD thesis, HSR Hochschule für Technik Rapperswil, 2010.
[Lam et al, 2006] Monica Lam, Ravi Sethi, JD Ullman, and Alfred Aho. Compilers: Principles, Techniques, and
Tools. Addison-Wesley, 2006.
References
35
[Ackermann et al, 2012] Stefan Ackermann, Vojin Jovanovic, Tiark Rompf, and Martin Odersky. Jet: An
embedded dsl for high performance big data processing. In International Workshop on End-to-end Management of Big Data (BigData 2012), number EPFL-CONF-181673, 2012.
[Alexandrov et al, 2014] Alexander Alexandrov, Rico Bergmann, Stephan Ewen, Johann-Christoph Frey- tag,
Fabian Hueske, Arvid Heise, Odej Kao, Marcus Leich, Ulf Leser, Volker Markl, et al. The stratosphere platform for big data analytics. The VLDB Journal, pages 1–26, 2014.
[Allen, 1970] Frances E Allen. Control flow analysis. In ACM Sigplan Notices, volume 5, pages 1–19. ACM, 1970.
[Ewen et al, 2012] Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl. Spinning fast iterative
data flows. Proceedings of the VLDB Endowment, 5(11):1268–1279, 2012.
[Burmako, 2013] Eugene Burmako. Scala macros: Let our powers combine!: On how rich syn- tax and static
types work with metaprogramming. In Proceedings of the 4th Workshop on Scala, page 3. 2013.
[Islam et al, 2012] Mohammad Islam, Angelo K Huang, Mohamed Battisha, Michelle Chiang, San- thosh
Srinivasan, Craig Peters, Andreas Neumann, and Alejandro Abdelnur. Oozie: towards a scalable workflow management system for hadoop. In Pro- ceedings of the 1st ACM SIGMOD Workshop on Scalable Workflow Execution Engines and Technologies, page 4. ACM, 2012.