• No results found

Functional Query Optimization with" SQL

N/A
N/A
Protected

Academic year: 2021

Share "Functional Query Optimization with" SQL"

Copied!
62
0
0

Loading.... (view fulltext now)

Full text

(1)

Michael Armbrust

@michaelarmbrust

spark.apache.org

Functional Query

Optimization with

"

"

SQL

(2)

What is Apache Spark?

Fast and general cluster computing system

interoperable with Hadoop

Improves efficiency through:

»

In-memory computing primitives

»

General computation graphs

Improves usability through:

»

Rich APIs in Scala, Java, Python

»

Interactive shell

Up to 100× faster

(2-10× on disk)

2-5× less code

(3)

A General Stack

Spark

Spark

Streaming

"

real-time

Spark

SQL

GraphX

graph

MLlib

machine learning

Spark

SQL

(4)

Spark Model

Write programs in terms of transformations on

distributed datasets

Resilient Distributed Datasets (RDDs)

»

Collections of objects that can be stored in

memory or disk across a cluster

»

Parallel functional transformations (map, filter, …)

(5)

More than Map/Reduce

map filter groupBy sort union join leftOuterJoin rightOuterJoin reduce count fold reduceByKey groupByKey cogroup cross zip sample take first partitionBy mapWith pipe save ...

(6)

Example: Log Mining

Load error messages from a log into memory,

then interactively search for various patterns

val  lines  =  spark.textFile(“hdfs://...”)  

val  errors  =  lines.filter(_  startswith  “ERROR”)  

val  messages  =  errors.map(_.split(“\t”)(2))  

messages.cache()   lines

Block 1 lines Block 2 lines Block 3 Worker Worker Worker Driver

messages.filter(_  contains  “foo”).count()   messages.filter(_  contains  “bar”).count()   . . . tasks results messages Cache 1 messages Cache 2 messages Cache 3 Base RDDTransformed RDD Action

Result: full-text search of Wikipedia in <1 sec (vs 20 sec for on-disk data)

Result: scaled to 1 TB data in 5-7 sec"

(7)

Fault Tolerance

file.map(record => (record.tpe, 1)) .reduceByKey(_ + _)

.filter { case (_, count) => count > 10 }

filter

reduce

map

Input file

(8)

filter reduce map Input file

Fault Tolerance

file.map(record => (record.tpe, 1)) .reduceByKey(_ + _)

.filter { case (_, count) => count > 10 }

(9)

and

Scala Provides:

Concise Serializable*

Functions

Easy interoperability with the

Hadoop ecosystem

Interactive REPL

* Made even better by Spores (5pm today)

Lines of Code Scala! Python! Java! Shell! Other!

(10)

0 20000 40000 60000 80000 100000 120000 140000 Hadoop

MapReduce (Streaming)Storm Impala (SQL) (Graph)Giraph Spark

non-test, non-example source lines

(11)

0 20000 40000 60000 80000 100000 120000 140000 Hadoop

MapReduce (Streaming)Storm Impala (SQL) (Graph)Giraph Spark

non-test, non-example source lines

Streaming

(12)

0 20000 40000 60000 80000 100000 120000 140000 Hadoop

MapReduce (Streaming)Storm Impala (SQL) (Graph)Giraph Spark

non-test, non-example source lines

SparkSQL

Streaming

(13)

Reduced Developer Complexity

0 20000 40000 60000 80000 100000 120000 140000 Hadoop

MapReduce (Streaming)Storm Impala (SQL) (Graph)Giraph Spark

non-test, non-example source lines

GraphX

(14)

One of the largest open source projects in big data

150+ developers contributing

30+ companies contributing

Spark Community

GiraphStorm Tez 0 50 100 150

(15)

Community Growth

Spark 0.6:

17 contributors

Feb ‘13

Oct ‘12 Sept ‘13 May‘14

Spark 0.8: 67 contributors Feb ‘14 Spark 0.9: 83 contributors Spark 1.0: 110 contributors Spark 0.7:" 31 contributors

(16)

With great power…

Strict project coding guidelines to make it

easier for non-Scala users and contributors:

Absolute imports only

Minimize infix function use

Java/Python friendly wrappers for user APIs

(17)
(18)

Shark modified the Hive backend to run over

Spark, but had two challenges:

»

Limited integration with Spark programs

»

Hive optimizer not designed for Spark

Spark SQL reuses the best parts of Shark:

Relationship to

Borrows

Hive data loading

In-memory column store

Adds

RDD-aware optimizer

(19)

Spark SQL Components

Catalyst Optimizer

Relational algebra + expressions

Query optimization

Spark SQL Core

Execution of queries as RDDs

Reading in Parquet, JSON …

Hive Support

HQL, MetaStore, SerDes, UDFs

26%! 36%! 38%!

(20)

Adding Schema to RDDs

Spark + RDDs

!

Functional transformations on partitioned collections of opaque

objects.

SQL + SchemaRDDs

!

Declarative transformations on partitioned collections of tuples.!

User User User

User User User

Name Age Height

Name Age Height

Name Age Height

Name Age Height

Name Age Height

(21)

Using Spark SQL

SQLContext  

Entry point for all SQL functionality

Wraps/extends existing spark context

val  sc:  SparkContext  //  An  existing  SparkContext.  

val  sqlContext  =  new  org.apache.spark.sql.SQLContext(sc)   //  Importing  the  SQL  context  gives  access  to  all  the  SQL   functions  and  conversions.  

(22)

Example Dataset

A text file filled with people’s names and ages:

Michael,  30  

Andy,  31  

Justin  Bieber,  19  

…  

(23)

Turning an RDD into a Relation

//  Define  the  schema  using  a  case  class.  

case  class  Person(name:  String,  age:  Int)    

//  Create  an  RDD  of  Person  objects  and  register  it  as  a  table.  

val  people  =  

   sc.textFile("examples/src/main/resources/people.txt")          .map(_.split(","))  

       .map(p  =>  Person(p(0),  p(1).trim.toInt))  

 

people.registerAsTable("people")  

   

(24)

Querying Using SQL

//  SQL  statements  are  run  with  the  sql  method  from  sqlContext.  

val  teenagers  =  sql("""  

 SELECT  name  FROM  people  WHERE  age  >=  13  AND  age  <=  19""")    

//  The  results  of  SQL  queries  are  SchemaRDDs  but  also     //  support  normal  RDD  operations.  

//  The  columns  of  a  row  in  the  result  are  accessed  by  ordinal.  

(25)

Querying Using the Scala DSL

Express queries using functions, instead of

SQL strings.

//  The  following  is  the  same  as:   //      SELECT  name  FROM  people    

//      WHERE  age  >=  10  AND  age  <=  19     val  teenagers  =      people        .where('age  >=  10)        .where('age  <=  19)        .select('name)  

(26)

Caching Tables In-Memory

Spark SQL can cache tables using an

in-memory columnar format:

Scan only required columns

Fewer allocated objects (less GC)

Automatically selects best compression

(27)

Parquet Compatibility

Native support for reading data in

Parquet:

Columnar storage avoids reading unneeded data.

RDDs can be written to parquet files, preserving the schema.

(28)

Using Parquet

//  Any  SchemaRDD  can  be  stored  as  Parquet.   people.saveAsParquetFile("people.parquet")    

//  Parquet  files  are  self-­‐describing  so  the  schema  is  preserved.  

val  parquetFile  =  sqlContext.parquetFile("people.parquet")    

//  Parquet  files  can  also  be  registered  as  tables  and  then  used   //  in  SQL  statements.  

parquetFile.registerAsTable("parquetFile”)  

val  teenagers  =  sql(  

   "SELECT  name  FROM  parquetFile  WHERE  age  >=  13  AND  age  <=  19")  

(29)

Hive Compatibility

Interfaces to access data and code in

"

the Hive ecosystem:

o 

Support for writing queries in HQL

o 

Catalog info from Hive MetaStore

o 

Tablescan operator that uses Hive SerDes

(30)

Reading Data Stored In Hive

val  hiveContext  =  new  org.apache.spark.sql.hive.HiveContext(sc)  

import  hiveContext._    

hql("CREATE  TABLE  IF  NOT  EXISTS  src  (key  INT,  value  STRING)")   hql("LOAD  DATA  LOCAL  INPATH  '.../kv1.txt'  INTO  TABLE  src")  

 

//  Queries  can  be  expressed  in  HiveQL.   hql("FROM  src  SELECT  key,  value")  

(31)

SQL and Machine Learning

val  trainingDataTable  =  sql("""  

   SELECT  e.action,  u.age,  u.latitude,  u.logitude          FROM  Users    u      

       JOIN  Events  e  ON  u.userId  =  e.userId""")    

//  SQL  results  are  RDDs  so  can  be  used  directly  in  Mllib.  

val  trainingData  =  trainingDataTable.map  {  row  =>  

   val  features  =  Array[Double](row(1),  row(2),  row(3))      LabeledPoint(row(0),  features)  

}  

(32)

Supports Java Too!

public  class  Person  implements  Serializable  {      private  String  _name;  

   private  int  _age;  

   public  String  getName()  {  return  _name;    }  

   public  void  setName(String  name)  {  _name  =  name;  }      public  int  getAge()  {  return  _age;  }  

   public  void  setAge(int  age)  {  _age  =  age;  }   }  

 

JavaSQLContext  ctx  =  new  org.apache.spark.sql.api.java.JavaSQLContext(sc)   JavaRDD<Person>  people  =  ctx.textFile("examples/src/main/resources/

people.txt").map(  

   new  Function<String,  Person>()  {  

       public  Person  call(String  line)  throws  Exception  {              String[]  parts  =  line.split(",");  

           Person  person  =  new  Person();              person.setName(parts[0]);  

           person.setAge(Integer.parseInt(parts[1].trim()));              return  person;  

       }      });  

JavaSchemaRDD  schemaPeople  =  sqlCtx.applySchema(people,  Person.class);    

   

(33)

Supports Python Too!

from  pyspark.context  import  SQLContext   sqlCtx  =  SQLContext(sc)  

 

lines  =  sc.textFile("examples/src/main/resources/people.txt")   parts  =  lines.map(lambda  l:  l.split(","))  

people  =  parts.map(lambda  p:  {"name":  p[0],  "age":  int(p[1])})    

peopleTable  =  sqlCtx.applySchema(people)   peopleTable.registerAsTable("people")    

teenagers  =  sqlCtx.sql("SELECT  name  FROM  people  WHERE  age  >=  13  AND  age  <=  19")   teenNames  =  teenagers.map(lambda  p:  "Name:  "  +  p.name)  

(34)

Optimizing

(35)

What is Query Optimization?

SQL is a

declarative

language:

Queries express

what

data to retrieve,

not how

to retrieve it.

The database must pick the ‘best’ execution

strategy through a process known as

(36)

Naïve Query Planning

SELECT  name   FROM  (  

     SELECT  id,  name        FROM  People)  p   WHERE  p.id  =  1   Project name Project id,name Filter id = 1 People Logical Plan Project name Project id,name Filter id = 1 TableScan People Physical Plan 36

(37)

Optimized Execution

Writing imperative code

to optimize all possible

patterns is hard.

Project name Project id,name Filter id = 1 People Logical Plan Project name Project id,name Filter id = 1 People IndexLookup id = 1 return: name Logical Plan Physical Plan

Instead write simple rules:

•  Each rule makes one change

•  Run many rules together to

fixed point.

(38)

Optimizing with Rules

Project name Project id,name Filter id = 1 People Original Plan Project name Project id,name Filter id = 1 People Filter Push-Down Project name Filter id = 1 People Combine Projection IndexLookup id = 1 return: name Physical Plan 38

(39)

Prior Work:

"

Optimizer Generators

Volcano / Cascades:

Create a custom language for expressing

rules that rewrite trees of relational

operators.

Build a compiler that generates executable

code for these rules.

Cons:  Developers  need  to  learn  this  custom  

language.  Language  might  not  be  powerful  

(40)

TreeNode Library

Easily transformable trees of operators

Standard collection functionality -

foreach,  

map,collect,

etc.

transform

function – recursive modification

of tree fragments that match a pattern.

Debugging support – pretty printing,

splicing, etc.

(41)

Tree Transformations

Developers express tree transformations as

PartialFunction[TreeType,TreeType]

1.

If the function

does apply

to an operator, that

operator is

replaced

with the result.

2.

When the function

does not apply

to an

operator, that operator is

left unchanged

.

3.

The transformation is

applied

recursively

to all

children.

(42)

Writing Rules as Tree

Transformations

1.

Find filters on top of

projections.

2.

Check that the filter

can be evaluated

without the result of

the project.

3.

If so, switch the

operators.

Project name Project id,name Filter id = 1 People Original Plan Project name Project id,name Filter id = 1 People Filter Push-Down 42

(43)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

(44)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

Partial Function

Tree

(45)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

Find Filter on Project

(46)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

Check that the filter can be evaluated without the result of the project.

(47)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

If so, switch the order.

(48)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

Pattern Matching

(49)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

Collections library

(50)

Filter Push Down Transformation

 

val  newPlan  =  queryPlan  transform  {  

 case  f  @  Filter(_,  p  @  Project(_,  grandChild))    

     if(f.references  subsetOf  grandChild.output)  =>    p.copy(child  =  f.copy(child  =  grandChild)  

}  

Copy Constructors

(51)

Efficient Expression Evaluation

Interpreting expressions (e.g., ‘a + b’) can

very expensive on the JVM:

Virtual function calls

Branches based on expression type

Object creation due to primitive boxing

Memory consumption by boxed primitive objects

(52)

Interpreting “a+b”

1.

Virtual call to Add.eval()

2.

Virtual call to a.eval()

3.

Return boxed Int

4.

Virtual call to b.eval()

5.

Return boxed Int

6.

Integer addition

7.

Return boxed result

Add

Attribute a

Attribute b

(53)

Using Runtime Reflection

def  generateCode(e:  Expression):  Tree  =  e  match  {      case  Attribute(ordinal)  =>  

       q"inputRow.getInt($ordinal)"  

   case  Add(left,  right)  =>          q"""  

           {  

               val  leftResult  =  ${generateCode(left)}                  val  rightResult  =  ${generateCode(right)}                  leftResult  +  rightResult  

           }            """  

(54)

Executing “a + b”

val

 left:  

Int  

=  inputRow.getInt(

0

)  

val

 right:  

Int

 =  inputRow.getInt(

1

)  

val

 result:  

Int

 =  left  +  right  

resultRow.setInt(

0

,  result)  

Fewer function calls

(55)

Performance Comparison

0 5 10 15 20 25 30 35 40

Intepreted Evaluation Hand-written Code Generated with Scala

Reflection

Milliseconds

(56)

TPC-DS Results

0 50 100 150 200 250 300 350 400

Query 19 Query 53 Query 34 Query 59

Seconds

(57)

Code Generation Made Simple

Other Systems (Impala, Drill, etc) do code

generation.

Scala Reflection + Quasiquotes

made

our implementation an experiment done

over a few weekends instead of a major

system overhaul.

(58)

Future Work: Typesafe Results

Currently:

people.registerAsTable(“people”)  

val  results  =  sql(“SELECT  name  FROM  people”)  

results.map(r  =>  r.getString(0))    

What we want:

val  results  =  sql“SELECT  name  FROM  $people”   results.map(_.name)  

 

(59)

Future Work: Typesafe Results

Currently:

people.registerAsTable(“people”)  

val  results  =  sql(“SELECT  name  FROM  people”)  

results.map(r  =>  r.getString(0))  

 

What we want:

val  results  =  sql“SELECT  name  FROM  $people”   results.map(_.name)  

 

(60)

Future Work: Typesafe Results

Currently:

people.registerAsTable(“people”)  

val  results  =  sql(“SELECT  name  FROM  people”)  

results.map(r  =>  r.getString(0))  

 

What we want:

val  results  =  sql“SELECT  name  FROM  $people”  

results.map(_.name)  

 

(61)

Get Started

Visit

spark.apache.org

for videos & tutorials

Download Spark bundle for CDH

Easy to run on just your laptop

Free training talks and hands-on

"

(62)

Conclusion

Big data analytics is evolving to include:

»

More complex analytics (e.g. machine learning)

»

More interactive ad-hoc queries, including SQL

»

More real-time stream processing

Spark is a fast platform that

unifies

these apps

"

Join us at Spark Summit 2014!

"

References

Related documents

- Add courses from the flexible timing section that are designated for the first or second year (see page 5) - Add 1 - 2 credits of Physical Activity courses (MOSK 140 is

For example, by using an equivalence relation on a universe of discourse, Dubois and Prade defined lower and upper approximations of fuzzy sets in the Pawlak approximation space

Further analysis of the cases indicates that ‘power’ in public health risk communication may be expressed through technical expertise, control of communication and creation

The Voucher Inquiry screen allows you to create a search by Business Unit, Voucher ID, Invoice ID, Supplier SetID, Short Name, Supplier Name 1, Supplier Name 2, Supplier ID,

Ranges Customer ID Contract Number Project ID Project Number Project Class ID Project Manager ID Business Manager ID Department Contract Class ID Sorting options Customer ID

fixed base available in the United States, they States, they are taxed on the income attributable.. is more than $400 for each day they are

4717 Orders for Delivery must be called in by 3:00 pm $3.50 Delivery Charge will be charged for calls after 3:00 pm Soup of the Week: Manhattan Clam Chowder.

The watershed management model integrates both natural and human elements of a watershed system including the management of ground and surface water sources, water treatment