• No results found

Rhea. Automatic IO Filtering for Optimizing Cloud Analytics

N/A
N/A
Protected

Academic year: 2021

Share "Rhea. Automatic IO Filtering for Optimizing Cloud Analytics"

Copied!
23
0
0

Loading.... (view fulltext now)

Full text

(1)

Rhea

Automatic IO Filtering for Optimizing Cloud Analytics

Christos Gkantsidis, Dimitrios Vytiniotis, Orion Hodson,

Ant Rowstron, Dushyanth Narayanan,

(2)

Data analytics

Data processing in the cloud is

hot

Momentum building up with

Microsoft Hadoop

Distribution

and Windows Azure

The Rhea project:

Reduce network transfers using programming languages

research

(3)

Data analytics as a cloud service

Storage and processing typically

not co-located

Reasons: management, isolation, different architectures

Industry embraces this design: Azure Compute/Storage, Amazon EC2/S3

Public Storage

Private Hadoop cluster

Cross cloud scenario

In-cloud scenario

Storage

Public Hadoop cluster

Internet/WAN

Network

infrastructure

(4)

The problem

Lots of data can be transferred between storage and compute

NB: different than original Hadoop –

storage and compute are not co-located

Potential network bottlenecks and processing delays

... even if a fraction of the data is needed for the computation

(e.g. click-log processing)

Compute

Azure storage

Mappers

Reducers

Network

(5)

Storage-to-compute bandwidth

0

100

200

300

400

500

600

700

0

2

4

6

8

10

12

14

16

18

P

er

-ins

tance

bandwidth

(Mbps)

Number of compute instances

Azure-EU-North/ExtraLarge

Amazon-US/ExtraLarge

Azure-EU-North/Small

(6)

Storage side filters

Compute nodes have (some) CPU and memory

Run best-effort filtering on storage nodes

Suppress/kill filters if resource consumption too high

Filters are “sloppy”

No false negatives

Can have any number of false positives

The full computation will run on the compute side, remove false positives

Compute

Azure storage

Mappers

Reducers

Network

(7)

Can we tell what “is needed”?

A very difficult problem:

The Hadoop data model is unstructured (think: text files)

Data is made up of

rows

separated in

columns

Lack of structure -> no known schema, can’t do SQL-like optimization

Key insight: go look inside the

job executable!

%22S%22_Bridge_II http://www.georss.org/georss/point 39.9930555556 -81.7466666667 l %22S%22_Bridge_II http://www.w3.org/2003/01/geo/wgs84_pos#lat 39.9930555556 l %22S%22_Bridge_II http://www.w3.org/2003/01/geo/wgs84_pos#long -81.7466666667 l %27Abadilah http://www.georss.org/georss/point 25.4383333333 56.1913888889 l %27Abadilah http://www.w3.org/2003/01/geo/wgs84_pos#lat 25.4383333333 l %27Abadilah http://www.w3.org/2003/01/geo/wgs84_pos#long 56.1913888889 l %27Abud http://www.georss.org/georss/point 32.0150166667 35.0681333333 l %27Abud http://www.w3.org/2003/01/geo/wgs84_pos#lat 32.0150166667 l %27Abud http://www.w3.org/2003/01/geo/wgs84_pos#long 35.0681333333 l %27Adan_Governorate http://www.georss.org/georss/point 12.9 44.9166666667 l %27Adan_Governorate http://www.w3.org/2003/01/geo/wgs84_pos#lat 12.9 l

(8)

We extract row and column filters

%22S%22_Bridge_II http://www.georss.org/georss/point 39.9930555556 -81.7466666667 l %22S%22_Bridge_II http://www.w3.org/2003/01/geo/wgs84_pos#lat 39.9930555556 l %22S%22_Bridge_II http://www.w3.org/2003/01/geo/wgs84_pos#long -81.7466666667 l %27Abadilah http://www.georss.org/georss/point 25.4383333333 56.1913888889 l %27Abadilah http://www.w3.org/2003/01/geo/wgs84_pos#lat 25.4383333333 l %27Abadilah http://www.w3.org/2003/01/geo/wgs84_pos#long 56.1913888889 l %27Abud http://www.georss.org/georss/point 32.0150166667 35.0681333333 l %27Abud http://www.w3.org/2003/01/geo/wgs84_pos#lat 32.0150166667 l %27Abud http://www.w3.org/2003/01/geo/wgs84_pos#long 35.0681333333 l %27Adan_Governorate http://www.georss.org/georss/point 12.9 44.9166666667 l %27Adan_Governorate http://www.w3.org/2003/01/geo/wgs84_pos#lat 12.9 l %22S%22_Bridge_II http://www.georss.org/georss/point * * %22S%22_Bridge_II http://www.w3.org/2003/01/geo/wgs84_pos#lat 39.9930555556 l %22S%22_Bridge_II http://www.w3.org/2003/01/geo/wgs84_pos#long -81.7466666667 l %27Abadilah http://www.georss.org/georss/point * * %27Abadilah http://www.w3.org/2003/01/geo/wgs84_pos#lat 25.4383333333 l %27Abadilah http://www.w3.org/2003/01/geo/wgs84_pos#long 56.1913888889 l %27Abud http://www.georss.org/georss/point * * %27Abud http://www.w3.org/2003/01/geo/wgs84_pos#lat 32.0150166667 l %27Abud http://www.w3.org/2003/01/geo/wgs84_pos#long 35.0681333333 l %27Adan_Governorate http://www.georss.org/georss/point * * %27Adan_Governorate http://www.w3.org/2003/01/geo/wgs84_pos#lat 12.9 l

Row

and

column

filtering

Towards

compute

(9)

Extract filters using static analysis

Analyze

executable code

(vs SQL or other DSL)

Path slicing

to extract row filters

Abstract interpretation

to extract column filters

Stateless, fail-safe, “sloppy” filters

Can be killed if they take up compute resources

Can be restarted on demand

(10)

Example: Extract row filter

public void map(…, Text value,

OutputCollector outputCollector, …) {

String dataRow = value.toString();

StringTokenizer dataTokenizer =

new StringTokenizer(dataRow,"\t");

String articleName =

dataTokenizer.nextToken();

String pointType =

dataTokenizer.nextToken();

if (

GEO_RSS_URI.equals(pointType)

)

{

… // more code omitted

outputCollector.collect(locationKey, articleName);

}

}

1. Identify Output Instructions

2. Find conditions that

lead to output

3. Collect related

instructions

(11)

Example: Extract row filter

public boolean isInteresting(Text bcvar2)

throws Exception {

java.lang.String bcvar5 = bcvar2.toString();

java.lang.String irvar0 = "\t";

java.util.StringTokenizer bcvar6

= new StringTokenizer(bcvar5,irvar0);

java.lang.String bcvar7 = bcvar6.nextToken();

java.lang.String bcvar8 = bcvar6.nextToken();

boolean irvar0_1 = GEO_RSS_URI.equals(bcvar8);

if (((irvar0_1?1:0) != 0)) {

return true;

}

return false;

}

The row filter is:

… derived from original (Java) program

… executable program

(12)

Column filtering

Understand common tokenization methods

E.g. String.Split()

Convert to abstract “transition” system

Map program points to sets of states

Map output to sets of states

“Column spec” is union of all output states

Column spec = how to tokenize + which tokens

(13)

Static analysis summary

Works directly on Java bytecode of mappers

Under 3 seconds for analysis on desktop PC

Under 10 sec for entire filter generation process

We collected 160 mappers from web

50% generated row filters

26% generated column filters

“State” was an unexpected problem

Mappers are stateless in theory but not in practice

If global state is used, we conservatively assume the

worst

(14)

Data reduction

0

0,1

0,2

0,3

0,4

0,5

0,6

0,7

0,8

0,9

1

Sel

ectivity

Row

Column

Row+column

(15)

Evaluation on Azure

In cloud: optimize LAN bandwidth

Cannot run filters (yet) on Azure storage servers

We prefilter the data offline

Measure filtering performance separately

Cross cloud: optimize WAN bandwidth

filters run in cloud compute VM

also reduces WAN egress costs

(16)

Data analytics as a cloud service

Public Storage

Private Hadoop cluster

Cross cloud scenario

In-cloud scenario

Storage

Public Hadoop cluster

Internet/WAN

Network

infrastructure

(17)

In-cloud: filtered vs. unfiltered

0

0,2

0,4

0,6

0,8

1

1,2

No

rm

ali

zed

run

time

(18)

Cross-cloud performance

0

0,2

0,4

0,6

0,8

1

1,2

No

rm

ali

zed

run

time

(19)

Cross-cloud $ savings

0

0,1

0,2

0,3

0,4

0,5

0,6

0,7

0,8

0,9

1

No

rm

ali

zed

$ cos

t

Bandwidth

Compute

(20)

Summary and on-going work

Rhea: row/column filters for unstructured storage

Automatically extracted from Java bytecode of app

Executable, best-effort, sloppy storage-side filters

Improve perf, save $$$

Current focus: reduce filtering overhead

Filters are executable Java bytecode

Native filters would be much better

Future: other uses of static analysis

(21)
(22)

Evaluation based on 9 jobs

0

100

200

300

400

500

600

700

800

900

0

20

40

60

80

100

120

Run

time (

s)

Input

da

ta siz

e (GB)

(23)

Filtering performance

0

1

2

3

4

5

Pr

oces

sin

g

Ra

te

(G

bp

s

per c

or

e)

References

Related documents

But in 2004, probably as a result of a greater rainfall from the start of the growing season that year, and of the annual basic fertilisation, there was a greater uptake

• Software-defined Cloud Computing – Optimizing and automating the Cloud configuration and adaptation by extending the virtualization to compute, storage, and networks. •

Figure 5 – Crop evapotranspiration (ET c ) of adult coffee plantations based on the SEBAL algorithm compared to ET c (mm d –1 ) calculated with observed data. ET c observed

Data obtained were subjected to summary statistics, di- versity analysis using both Simpson diversity and Shannon evenness index, and rank abundance curve and model.. The

The automated cloud environment build service automatically creates cloud environments comprising of compute, storage and network from an environment specification

• Azure Workbooks • Azure Alerts • Log Analytics • Network Watcher • Azure Service Health • Monitor Azure Costs • Azure Application Insights •

• Azure Workbooks • Azure Alerts • Log Analytics • Network Watcher • Azure Service Health • Monitor Azure Costs • Azure Application Insights •

 Server Network  SAN Network  Disks Subsystem IO Network Clients IO Servers (Data and/or Metadata) Enterprise Storage Client IO Requests Server Network Performance SAN