• No results found

Relational Data Processing on MapReduce

N/A
N/A
Protected

Academic year: 2021

Share "Relational Data Processing on MapReduce"

Copied!
36
0
0

Loading.... (view fulltext now)

Full text

(1)

1

Relational Data Processing on MapReduce

V. CHRISTOPHIDES

Department of Computer Science

University of Crete

2

Univ. of Crete Spring 2015

12+ TBs of tweet data

every day

25+ TBs of log data every day generated by a new

user being added every sec. for 3 years

? T B s o f d a ta e v e ry d a y 2+ billion people on the Web by end 2011 30 billionRFID tags today (1.3B in 2005) 4.6 billion camera phones world wide 100s of millions of GPS enabled devices sold annually 76 millionsmart meters in 2009. 200M by 2014

Peta-scale Data Analysis

4 billion views/day YouTube is the 2nd most used

(2)

3

A Bird’s Eye View of Big Data

Source: Barry Devlin, “The Big Data Zoo --- Taming the Beasts

The three domains of information

4

Univ. of Crete Spring 2015

Positioning Big Data

(3)

5

Big Data Analysis

A lot of these datasets have some structure

Query logs

Point-of-sale records

User data (e.g., demographics) (

How do we perform data analysis at

scale?

Relational databases and SQL

MapReduce (Hadoop)

6

Univ. of Crete Spring 2015

Relational Databases vs. MapReduce

Relational databases:

Multipurpose: analysis and transactions; batch and interactive Data integrity via ACID transactions

Lots of tools in software ecosystem (for ingesting, reporting, etc.) Supports SQL (and SQL integration, e.g., JDBC)

Automatic SQL query optimization

MapReduce (Hadoop):

Designed for large clusters, fault tolerant

Data is accessed in “native format”

Supports many query languages

Programmers retain control over performance Open source

(4)

7

Parallel Relational Databases vs. MapReduce

Parallel relational databases

Schema on write

Failures are relatively infrequent “Possessive” of data

Mostly proprietary

MapReduce

Schema on read

Failures are relatively common In situ data processing

Open source

Shared-nothing architecture for parallel processing

Hadoop NextGen (YARN) architecture

8

Univ. of Crete Spring 2015

(5)

9

Database Workloads

OLTP (online transaction processing)

Typical applications: e-commerce, banking, airline reservations

User facing: real-time, low latency, highly-concurrent

Tasks: relatively small set of “standard” transactional queries Data access pattern: random reads, updates, writes (involving

relatively small amounts of data)

OLAP (online analytical processing)

Typical applications: business intelligence, data mining Back-end processing: batch workloads, less concurrency

Tasks: complex analytical queries, often ad hoc

Data access pattern: table scans, large amounts of data involved per

query

10

Univ. of Crete Spring 2015

One Database or Two?

Downsides of co-existing OLTP

and OLAP workloads

Poor memory management

Conflicting data access patterns Variable latency

Solution: separate databases

User-facing OLTP database for high-volume transactions

Data warehouse for OLAP workloads

(6)

11

OLTP/OLAP Integration

OLTP database for user-facing transactions

Retain records of all activity

Periodic ETL (e.g., nightly) Extract-Transform-Load (ETL)

Extract records from source

Transform: clean data, check integrity, aggregate, etc.

Load into OLAP database

OLAP database for data warehousing

Business intelligence: reporting, ad hoc queries, data mining, etc. Feedback to improve OLTP services

OLTP

OLAP

ETL

(Extract, Transform, Load)

12

Univ. of Crete Spring 2015

(7)

13

MapReduce: A Major Step Backwards?

MapReduce is a step backward in database access

Schemas are good

Separation of the schema from the application is good High-level access languages are good

MapReduce is poor implementation

Brute force and only brute force (no indexes, for example)

MapReduce is missing features

Bulk loader, indexing, updates, transactions...

MapReduce is incompatible with DMBS tools

14

Univ. of Crete Spring 2015

OLTP/OLAP Architecture: Hadoop?

OLTP

OLAP

ETL

(Extract, Transform, Load)

Hadoop here?

(8)

15

OLTP/OLAP/Hadoop Architecture

Why does this make sense?

OLTP

OLAP

ETL

(Extract, Transform, Load)

Hadoop

16

Univ. of Crete Spring 2015

ETL Bottleneck

Reporting is often a nightly task:

ETL is often slow: why?

What happens if processing 24 hours of data takes longer than 24 hours? Often, with noisy datasets, ETL is the analysis!

ETL necessarily involves brute force data scans: L, then E and T?

Hadoop is perfect:

Most likely, you already have some data warehousing solution Ingest is limited by speed of HDFS

Scales out with more nodes

Massively parallel

Ability to use any processing tool Much cheaper than parallel databases ETL is a batch process anyway!

(9)

17

MapReduce Algorithms

for Processing Relational Data

18

Univ. of Crete Spring 2015

combine

combine combine combine

b

a 1 2 c 9 a 5 c 2 b 7 c 8

partition partition partition partition

map

map map map

k1v1 k2v2 k3v3 k4v4 k5v5 k6v6

b

a 1 2 c 3 c 6 a 5 c 2 b 7 c 8

Shuffle and Sort: aggregate values by keys

reduce reduce reduce

a 1 5 b 2 7 c 2 9 8

r1 s1 r2 s2 r3 s3

(10)

19

Secondary Sorting

MapReduce sorts input to reducers by key Values are arbitrarily ordered

What if want to sort value also?

E.g., k → (v1, R), (v3, R),

(v4, R), (v8, R)…

Solution 1:

Buffer values in memory, then sort Why is this a bad idea?

Solution 2:

“Value-to-key conversion”: extends the key with part of the value Let execution framework do the sorting

Preserve state across multiple key-value pairs to handle processing

Anything else we need to do?

20

Univ. of Crete Spring 2015

Value-to-Key Conversion

Default comparator, group comparator, and Partitionerhas to be tuned to use the appropriate part of the key

k → (v1, R), (v4, R), (v8, R), (v3, R)… (k, v1) → (v1, R) Before After (k, v3) → (v3, R) (k, v4) → (v4, R) (k, v8) → (v8, R)

Values arrive in arbitrary order(

Values arrive in sorted order(

(11)

21

Working Scenario

Two tables:

User demographics (gender, age, income, etc.)

User page visits (URL, time spent, etc.)

Analyseswe might want to perform:

Statistics on demographic characteristics

Statistics on page visits

Statistics on page visits by URL

Statistics on page visits by demographic characteristic (

22

Univ. of Crete Spring 2015

Relational Algebra

(12)

23

Projection

R1 R2 R3 R4 R5 R1 R2 R3 R4 R5 πS (R) 24

Univ. of Crete Spring 2015

Projection in MapReduce

Easy!

Map over tuples, emit new tuples with appropriate attributes

For each tuple t in R, construct a tuple t’ by eliminating those

components whose attributes are not in S emit a key/value pair (t’, t’)

No reducers (reducers are the identity function), unless for regrouping or resorting tuples

the Reduce operation performs duplicate elimination

Alternatively: perform in reducer, after some other processing

Basically limited by HDFS streaming speeds

Speed of encoding/decoding tuples becomes important Relational databases take advantage of compression Semi-structured data? No problem!

(13)

25

Selection

R1 R2 R3 R4 R5 R1 R3 σC(R) 26

Univ. of Crete Spring 2015

Selection in MapReduce

Easy!

Map over tuples, emit only tuples that meet criteria

For each tuple t in R, check if t satisfies C and If so, emit a key/value pair (t, t)

No reducers (reducers are the identity function), unless for regrouping or resorting tuples

Alternatively: perform in reducer, after some other processing

Basically limited by HDFS streaming speeds:

Speed of encoding/decoding tuples becomes important

Relational databases take advantage of compression Semistructured data? No problem!

(14)

27

Set Operations in Map Reduce

R(X,Y) ⋃ S(Y,Z)

Map: for each tuple t either in R or in S, and emit (t,t)

Reduce: either receive (t,[t,t]) or (t,[t])

Always emit (t,t)

We perform duplicate elimination

R(X,Y) ⋂ S(Y,Z)

Map: for each tuple t either in R or in S, emit (t,t) Reduce: either receive (t,[t,t]) or (t,[t])

Emit (t,t) in the former case and nothing in the latter

R(X,Y) - S(Y,Z)

Map: for each tuple t either in R or in S, emit (t, R or S) Reduce: receive (t,[R]) or (t,[S]) or (t,[R,S])

Emit (t,t) only when received (t,[R]) otherwise (t, NULL)

28

Univ. of Crete Spring 2015

Group by( Aggregation

Example: What is the average time spent per URL?

In SQL:

SELECT url, AVG(time) FROM visits GROUP BY url

In MapReduce: Let R(A, B, C) be a relation to which we apply γA,θ(B)(R) The map operation prepares the grouping (e.g., emit time, keyed by url) The grouping is done by the framework

The reducer computes the aggregation (e.g. average)

Eventually, optimize with combiners

Simplifying assumptions: one grouping attribute and one aggregation

(15)

29

Relational Joins

R1 R2 R3 R4 S1 S2 S3 S4 R1 S2 R2 S4 R3 S1 R4 S3 R S 30

Univ. of Crete Spring 2015

Types of Relationships

One-to-One One-to-Many

(16)

31

Join Algorithms in MapReduce

“Join” usually just means equi-join, but we also want to support other join

predicates

Hadoop has some built-in join support, but our goal is to understand important algorithm design principles

Algorithms Reduce-side join Map-side join In-memory join Striped variant Memcached variant 32

Univ. of Crete Spring 2015

Reduce-side Join

Basic idea: group by join key

Execution framework brings together tuples sharing the same key Similar to a “sort-merge join” in the database terminology

A mapfunction

Receives a record in R and S

Emits its join attribute value as a key and the record as a value A reducefunction

Receives each join attribute value with its records from R and S

Perform actual join between the records in R and S

Two variants 1-to-1 joins

(17)

33

Re-Partition Join

33

Dataset A Dataset B Different join keys

HDFS stores data blocks

(Replicas are not shown)

Mapper M+N Mapper2

Mapper 1 Mapper3

- Each mapper processes one block (split)

- Each mapper produces the join key and the record pairs

Reducer 1 Reducer 2 Reducer N Reducers perform the actual join

Shuffling and Sorting Phase

Shuffling and sorting over the network

34

Univ. of Crete Spring 2015

Reduce-side Join: 1-to-1

Note: no guarantee if R is going to come first or S!

R1 R4 S2 S3 R1 R4 S2 S3 keys values R1 R4 S2 S3 keys values Map Reduce

(18)

35

Reduce-Side Join: 1-to-1

At most one tuple from R and one tuple from S share the same join key

Reducer will receive keys in the following format k23 → [(r64,R64),(s84,S84)]

k37 → [(r68, R68)]

k59 → [(s97,S97),(r81,R81)]

k61 → [(s99, S99)]

If there are two values associated with a key, one must be from R and

the other from S

What should the reduced do for k37 ?

36

Univ. of Crete Spring 2015

Reduce-side Join: 1-to-Many

What’s the problem?

R is the one side, S is the many

R1 S2 S3 R1 S2 S3 S9 keys values Map R1 S2 keys values Reduce S9 S3 .

(19)

37

Reduce-side Join: Value-to-Key Conversion

www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/example-8-9 R1 keys values In reducer. S2 S3 S9 R4 S3 S7

New key encountered: hold in memory Cross with records from other set

New key encountered: hold in memory Cross with records from other set

Buffer all values in memory, pick out the tuple from R, and then cross it with every tuple from S to perform the join

38

Univ. of Crete Spring 2015

Reduce-side Join: Many-to-Many

What’s the problem? R is the smaller dataset

R1 keys values In reducer. S2 S3 S9 Hold in memory

Cross with records from other set

R5 R8

(20)

39

Map-side Join: Basic Idea

What are the limitations of reduce-side joins?

Both datasets are transferred over the network

Assume two datasets are sorted by the join key:

R1 R2 R3 R4 S1 S2 S3 S4

A sequential scan through both datasets to join: called a “sort-merge join” in database terminology

40

Univ. of Crete Spring 2015

Map-side Join: Parallel Scans

If datasets are sorted by join key, join can be accomplished by a scan

over both datasets

How can we accomplish this in parallel?

Partition and sort both datasets in the same manner

In MapReduce:

Map over one dataset, read from other corresponding partition

No reducers necessary (unless to repartition or resort)

Consistently partitioned datasets: realistic to expect?

Depends on the workflow

For ad hoc data analysis, reduce-side are more general, although

(21)

41

Broadcast/Replication Join

Dataset A Dataset B Different join keys

41

HDFS stores data blocks

(Replicas are not shown)

Mapper N Mapper 1 Mapper 2

42

Univ. of Crete Spring 2015

In-Memory Join: Variants

Basic idea: load one dataset into memory, stream over other dataset Works if R << S and R fits into memory

Called a “hash join” in database terminology

MapReduce implementation

Distribute R to all nodes

Map over S, each mapper loads R in memory, hashed by join key

For every tuple in S, look up join key in R

No reducers, unless for regrouping or resorting tuples

Downside: need to copy R to all mappers

(22)

43

In-Memory Join: Variants

Distributed Cache: Efficient way to copy files to all nodes processing a certain task

Use it to send small R to all mappers

Part of the job configuration

Striped variant:

R too big to fit into memory?

Divide R into R1, R2, R3, ( s.t. each Rn fits into memory Perform in-memory join: ∀n, Rn ⋈ S

Take the union of all join results

Hadoop still needs to move the data to the worker nodes, so use this with

care

But it avoids copying the file for every task on the same node

44

Univ. of Crete Spring 2015

Memcached

Caching servers: 15 million requests per sec, 95% handled by memcache

(15 TB of RAM)

Database layer: 800 eight-core Linux servers running MySQL (40 TB user data)

In order to achieve good performance in accessing distributed key-value stores, it is often necessary to batch queries before making synchronous requests (to amortize latency over many requests) or to rely on asynchronous requests

(23)

45

Memcached Join

Memcached join:

Load R into memcached

Replace in-memory hash lookup with memcached lookup

Capacity and scalability?

Memcached capacity >> RAM of individual node

Memcached scales out with cluster

Latency?

Memcached is fast (basically, speed of network)

Batch requests to amortize latency costs

46

Univ. of Crete Spring 2015

Which Join to Use?

In-memory join > map-side join > reduce-side join

Why?

Limitations of each?

In-memory join: memory

Map-side join: sort order and partitioning

Reduce-side join: general purpose algorithm but sensible to data

skewness?

What about non-equi joins?

Inequality (S.A<R.A): map just forwards R-tuples, but replicates

(24)

47

Problems With Standard Equi-Join Algorithm

Degree of

parallelism limited by number of distinct join values

Data skew

If one join value

dominates, reducer

processing that key will become bottleneck

Does not generalize

to other joins

48

Univ. of Crete Spring 2015

On Standard Repartition Equi-Join Algorithm

Consider only the pairs with the same join attribute values

Naïve join algorithm

Standard repartition join algorithm

(25)

49

Standard Repartition Equi-Join Algorithm

50

Univ. of Crete Spring 2015

(26)

51

Reducer-Centric Cost Model

Difference between join implementations starts with Map output

52

Univ. of Crete Spring 2015

Optimization Goal: Minimal Job Completion time

Job complete time depends on the slowest map and reduce functions

Balancing the workloads of map functions is easy and thus we ignore map

functions

Balance the workloads of reduce functions as evenly as possible Assume all reducers are similarly capable

Processing time at reducer is approx. monotonicin input and output size

Hence need to minimize max-reducer-inputor max-reducer-output Join problem classification

Input-size dominated: minimize max-reducer-input

Output-size dominated: minimize max-reducer-output

Input-output balanced: minimize combination of both

(27)

53

Join Model

Join-matrix M: M(i, j) = true, if and only if (si, tj) in join result

Cover each true-valued cell by exactly one reducer

© Kyuseok Shim (VLDB 2012 TUTORIAL)

54

Univ. of Crete Spring 2015

Reduce Allocations for Standard Repartition Equi-joins

Max reduce input size = 5 Max reduce output size = 6

Random Balanced

Simple/Standard

Max reduce input size = 8

Max reduce output size = 4 Max reduce input size = 5Max reduce output size = 4

© Kyuseok Shim (VLDB 2012 TUTORIAL)

(28)

55

Comparisons of Reduce Allocation Methods

Simple allocation

Minimize the maximum input size of reduce functions Output size may be skewed

Random allocation

Minimize the maximum outputsize of reduce functions Input size may be increased due to duplication

Balanced allocation

Minimize both maximum input and output sizes

© Kyuseok Shim (VLDB 2012 TUTORIAL)

56

Univ. of Crete Spring 2015

How to Balance Reduce Allocation

Assume r is desired number of reduce functions

Partition join-matrix M into r regions

A map function sends each record in R and S to mapped regions

A reduce function outputs all possible (r,s) pairs satisfying the join

predicates in its value-list

(29)

57

Processing Relational Data: Summary

MapReduce algorithms for processing relational data:

Group by, sorting, partitioning are handled automatically by shuffle/sort in MapReduce

Selection, projection, and other computations (e.g., aggregation), are

performed either in mapper or reducer

Complex operations require multiple MapReduce jobs

Example: top ten URLs in terms of average time spent Opportunities for automatic optimization

Multiple strategies for relational joins

58

Univ. of Crete Spring 2015

Join Implementations on MapReduce

Feng Li, Beng Chin Ooi, M. Tamer Özsu, and Sai Wu. 2014. Distributed data management using MapReduce. ACM Comput. Surv. 46, 3, January 2014

(30)

59

Evolving Roles for Relational

Database and MapReduce

60

Univ. of Crete Spring 2015

The Traditional Way: Bringing Data to Compute

Evolution from Apache Hadoop to the Enterprise Data Hub A. Awadallah Co-Founder & CTO of Cloudera SMDB 2014

(31)

61

The New Way: Bringing Compute to Data

Evolution from Apache Hadoop to the Enterprise Data Hub A. Awadallah Co-Founder & CTO of Cloudera SMDB 2014

62

Univ. of Crete Spring 2015

OLTP/OLAP/Hadoop Architecture

OLTP

OLAP

ETL

(Extract, Transform, Load)

Hadoop

Why does this make sense?

Often, with noisy datasets, ETL is the analysis!

Note that ETL necessarily involves brute force data scans

(32)

63

Need for High-Level Languages

Hadoop is great for large-data processing!

But writing Java programs for everything is verbose and slow

Analysts don’t want to (or can’t) write Java

Solution: develop higher-level data processing languages Hive: HQL is like SQL

Pig: Pig Latin is a bit like Perl

64

Univ. of Crete Spring 2015

Hive and Pig

Hive: data warehousing application in Hadoop

Query language is HQL, variant of SQL Tables stored on HDFS as flat files

Developed by Facebook, now open source

Pig: large-scale data processing system

Scripts are written in Pig Latin, a dataflow language Developed by Yahoo!, now open source

Roughly 1/3 of all Yahoo! internal jobs

Common idea:

Provide higher-level language to facilitate large-data processing Higher-level language “compiles down” to Hadoop jobs

(33)

65

Hive: Example

Hive looks similar to an SQL database Relational join on two tables:

Table of word counts from Shakespeare collection

Table of word counts from the bible SELECT

SELECT SELECT

SELECT s.words.words.word, s.word, , s.freq, s.freqs.freqs.freq, , k.freq, , k.freqk.freqk.freq FROM FROM FROM shakespeareFROM shakespeareshakespeare s shakespeare s s s JOIN bible k ON (

JOIN bible k ON ( JOIN bible k ON (

JOIN bible k ON (s.words.words.words.word = = = k.word= k.wordk.wordk.word) WHERE ) WHERE s.freq) WHERE ) WHERE s.freqs.freqs.freq >= 1 AND >= 1 AND >= 1 AND k.freq>= 1 AND k.freqk.freq >= 1 k.freq >= 1 >= 1 >= 1 ORDER BY

ORDER BY ORDER BY

ORDER BY s.freqs.freqs.freqs.freq DESC LIMIT 10;DESC LIMIT 10;DESC LIMIT 10;DESC LIMIT 10; the the the the 25848258482584825848 62394623946239462394 I I I I 23031230312303123031 8854885488548854 and and and and 19671196711967119671 38985389853898538985 to to to to 18038180381803818038 13526135261352613526 of of of of 16700167001670016700 34654346543465434654 a a a a 14170141701417014170 8057805780578057 you you you you 12702127021270212702 2720272027202720 my my my my 11297112971129711297 4135413541354135 in in in in 10797107971079710797 12445124451244512445 is is is is 8882888288828882 6884688468846884

Source: Material drawn from Cloudera training VM

66

Univ. of Crete Spring 2015

Hive: Behind the Scenes

SELECT

SELECT SELECT

SELECT s.words.words.word, s.word, , , s.freqs.freqs.freqs.freq, , k.freq, , k.freqk.freqk.freq FROM FROM FROM shakespeareFROM shakespeareshakespeare s shakespeare s s s JOIN bible k ON (

JOIN bible k ON (JOIN bible k ON (

JOIN bible k ON (s.words.words.words.word = = = k.word= k.wordk.wordk.word) WHERE ) WHERE s.freq) WHERE ) WHERE s.freqs.freqs.freq >= 1 AND >= 1 AND >= 1 AND >= 1 AND k.freqk.freqk.freqk.freq >= 1 >= 1 >= 1 >= 1 ORDER BY

ORDER BY ORDER BY

ORDER BY s.freqs.freqs.freqs.freq DESC LIMIT 10;DESC LIMIT 10;DESC LIMIT 10;DESC LIMIT 10;

(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF

(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF shakespeareshakespeareshakespeare s) (TOK_TABREF bible k) (= (. shakespeare s) (TOK_TABREF bible k) (= (. s) (TOK_TABREF bible k) (= (. s) (TOK_TABREF bible k) (= (. (TOK_TABLE_OR_COL s) word) (. (TOK_TABLE_OR_COL k) word)))) (TOK_INSERT

(TOK_TABLE_OR_COL s) word) (. (TOK_TABLE_OR_COL k) word)))) (TOK_INSERT (TOK_TABLE_OR_COL s) word) (. (TOK_TABLE_OR_COL k) word)))) (TOK_INSERT (TOK_TABLE_OR_COL s) word) (. (TOK_TABLE_OR_COL k) word)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) word)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) (TOK_TABLE_OR_COL s) word)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) (TOK_TABLE_OR_COL s) word)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s)

(TOK_TABLE_OR_COL s) word)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) freqfreqfreqfreq)) (TOK_SELEXPR )) (TOK_SELEXPR )) (TOK_SELEXPR )) (TOK_SELEXPR (. (TOK_TABLE_OR_COL k)

(. (TOK_TABLE_OR_COL k) (. (TOK_TABLE_OR_COL k)

(. (TOK_TABLE_OR_COL k) freqfreqfreqfreq))) (TOK_WHERE (AND (>= (. (TOK_TABLE_OR_COL s) ))) (TOK_WHERE (AND (>= (. (TOK_TABLE_OR_COL s) ))) (TOK_WHERE (AND (>= (. (TOK_TABLE_OR_COL s) ))) (TOK_WHERE (AND (>= (. (TOK_TABLE_OR_COL s) freqfreqfreqfreq) 1) ) 1) ) 1) ) 1) (>= (. (TOK_TABLE_OR_COL k)

(>= (. (TOK_TABLE_OR_COL k) (>= (. (TOK_TABLE_OR_COL k)

(>= (. (TOK_TABLE_OR_COL k) freqfreqfreqfreq) 1))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (. ) 1))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (. ) 1))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (. ) 1))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (. (TOK_TABLE_OR_COL s)

(TOK_TABLE_OR_COL s) (TOK_TABLE_OR_COL s)

(TOK_TABLE_OR_COL s) freqfreqfreqfreq))) (TOK_LIMIT 10)))))) (TOK_LIMIT 10)))))) (TOK_LIMIT 10)))))) (TOK_LIMIT 10)))

(one or more of MapReduce jobs) (one or more of MapReduce jobs) (one or more of MapReduce jobs) (one or more of MapReduce jobs)

(Abstract Syntax Tree) (Abstract Syntax Tree)(Abstract Syntax Tree) (Abstract Syntax Tree)

(34)

67

Hive: Behind the Scenes

STAGE DEPENDENCIES: Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 is a root stage

STAGE PLANS: Stage: Stage-1 Map Reduce

Alias -> Map Operator Tree: s TableScan alias: s Filter Operator predicate: expr: (freq >= 1) type: boolean Reduce Output Operator

key expressions: expr: word type: string sort order: + Map-reduce partition columns:

expr: word type: string tag: 0 value expressions: expr: freq type: int expr: word type: string k TableScan alias: k Filter Operator predicate: expr: (freq >= 1) type: boolean Reduce Output Operator

key expressions: expr: word type: string sort order: + Map-reduce partition columns:

expr: word type: string tag: 1 value expressions: expr: freq type: int

Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col1} 1 {VALUE._col0}

outputColumnNames: _col0, _col1, _col2 Filter Operator

predicate:

expr: ((_col0 >= 1) and (_col2 >= 1)) type: boolean Select Operator expressions: expr: _col1 type: string expr: _col0 type: int expr: _col2 type: int

outputColumnNames: _col0, _col1, _col2 File Output Operator

compressed: false GlobalTableId: 0 table:

input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

Stage: Stage-2 Map Reduce

Alias -> Map Operator Tree:

hdfs://localhost:8022/tmp/hive-training/364214370/10002 Reduce Output Operator

key expressions: expr: _col1 type: int sort order: -tag: -1 value expressions: expr: _col0 type: string expr: _col1 type: int expr: _col2 type: int Reduce Operator Tree:

Extract Limit

File Output Operator compressed: false GlobalTableId: 0 table:

input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Stage: Stage-0 Fetch Operator

limit: 10

68

Univ. of Crete Spring 2015

Pig: Example

(35)

69

Pig Query Plan

70

Univ. of Crete Spring 2015

Pig Script

visits = load ‘/data/visits’ as (user, url, time); gVisits = group visits by url;

visitCounts = foreach gVisits generate url, count(visits); urlInfo = load ‘/data/urlInfo’ as (url, category, pRank); visitCounts = join visitCounts by url, urlInfo by url; gCategories = group visitCounts by category;

topUrls = foreach gCategories generate top(visitCounts,10); store topUrls into ‘/data/topUrls’;

(36)

71

Pig Query Plan

72

Univ. of Crete Spring 2015

References

CS9223 – Massive Data Analysis J. Freire & J. Simeon New York University

Course 2013

INFM 718G / CMSC 828G Data-Intensive Computing with MapReduce J.

Lin University of Maryland 2013

CS 6240: Parallel Data Processing in MapReduce Mirek Riedewald

Northeastern University 2014

Extreme Computing Stratis D. Viglas University of Edinburg 2014

MapReduce Algorithms for Big Data Analysis Kyuseok Shim VLDB 2012

References

Related documents

attracted the minds of many devotee poets and scholars of Hindu, Buddhist, Jaina and various other religious sects* Within a few centuries a large number of

The seed yield trials and agronomic studies conducted to test the efficacy of different lentil genotypes over the past decade inferred that genotype 98CL008 performed

SQL without subquery “nested loops” languages SQL with subquery relational expr without Apply relational expr with Apply logical reordering set-oriented execution navigational,

significant linear trends in both mean growth season length and critical development

◦ A general duty to use ordinary care to keep the premises reasonably safe for the benefit of the invitee. ◦ Typically you show these people the most concern anyways (i.e.

He is currently a Lecturer in Games Design and Development in the School of Electronics and Info-Comm Technology, ITE College Central (Singapore), and an Adjunct

Epicor Distribution Express Edition is a comprehensive software system designed specifi cally for small and mid-sized distributors, that is provided in the software as a

Like define-syntax and define-for-syntax in scheme/base , but without support for keyword arguments or optional arguments (i.e., head is as for define)?. (if test-expr