• No results found

Data Warehouse Overview. Namit Jain

N/A
N/A
Protected

Academic year: 2021

Share "Data Warehouse Overview. Namit Jain"

Copied!
86
0
0

Loading.... (view fulltext now)

Full text

(1)

Data Warehouse Overview

(2)

Agenda

Why data ?

Life of a tag for data infrastructure

Warehouse architecture

Challenges

(3)

Data Science –

peace.facebook.com

(4)

Data Science -

facebook.com/data

(5)
(6)

‘Data-enhanced’ Products

§

People You May Know (PYMK)

§

Newsfeed ranking

§

Ads optimization

(7)
(8)

Internal Reporting – Product Insights

§

Data-driven product development

§

Allows products to iterate quickly by observing user

(9)

Life of a tag

(10)
(11)

Facebook Architecture – Data Sources

Log Data (facts)

•  Web-tier – user activity logs

•  View/click of an ad, liking a story, fanning a page, status update, …

•  Backend Services - Search,

Newsfeed, Ads

Facebook-Site related Data

(dimensions)

•  MySQL

•  Descriptions of ads •  User demographics

(12)

www.facebook.com  

User tags a photo

Log line generated: <user_id, photo_id>

Scribe  Log  Storage  

Log line reaches Scribeh (1s)

copier/loader  

Warehouse  

Log line reaches warehouse (1hr)

MySql   Scrapes  

User info reaches Warehouse (1day)

nocron   Periodic  Analysis  

Daily report on count of photo tags by country (1day)

hipal  

Adhoc  Analysis  

Count photos tagged by females age 20-25 yesterday

Life of a tag for data infrastructure

Count users tagging photos in the last hour (1min)

(13)

Takeaways

•  Log collection

•  Realtime analysis

•  Batch analysis

•  Periodic analysis

•  Interactive analysis

(14)

Takeaways

•  Scribe/Calligraphus

•  Puma/HBase

•  Hive/Hadoop

(15)

Takeaways

•  Open Source

• Scribe

• HBase

(16)

§

Open Source, simple and scalable log collection system

Scribe

(17)

Challenges: Choosing the right stack ?

Hadoop/

Hive AsterData Oracle/ Sharded MySQL

Cost

Availability

Scalability

Performance

ACID

Ease of Use ✔

(18)
(19)

Warehouse Architecture

(20)

Warehouse Architecture

Storage (HDFS)

(21)

Warehouse Architecture

Storage (HDFS)

Compute (MapReduce)

(22)

Warehouse Architecture

Storage (HDFS)

Compute (MapReduce)

Hadoop

(23)

Warehouse Architecture

Storage (HDFS)

Compute (MapReduce)

Hadoop

Query (Hive)

Workflow (Nocron)

(24)

What is

§  Hadoop: Open Source Apache project

–  Framework for running applications on large clusters of commodity hardware §  Scale: petabytes of data on thousands of nodes

–  Hadoop layers:

§  Storage layer: HDFS

§  Processing layer: MapReduce

§  Characteristics:

–  Uses clusters of commodity computers

–  Supports moving computation close to data

§  Single ‘storage + compute’ cluster vs. Separate clusters

§  Scalable, fault tolerant, and easily managed

(25)

25

HDFS Data Model

§

Data is logically organized into files and directories

§

Files are divided into uniform-sized blocks

§

Blocks are distributed across the nodes of the cluster and

are replicated to handle hardware failure

§

HDFS keeps checksums of data for corruption detection and

recovery

§

HDFS exposes block placement so that computation can be

(26)

26

HDFS Architecture

Read

Metadata ops

Client

Metadata (Name, #replicas, …): /users/foo/data, 3, … Namenode Client Datanodes Rack 1 Rack 2 Replication Block ops Datanodes Write Blocks Metadata ops

(27)
(28)
(29)

Hive

§

Aim to simplify usage of Hadoop

§

A system for managing and querying structured and

semi-structured data built on top of Hadoop

§  Map-Reduce for execution §  HDFS for storage

§  Metadata on HDFS files

§

Key Building Principles

§  SQL is a familiar language

§  Extensibility – Types, Functions, Formats, Scripts §  Performance

(30)

Hive – Simplifying usage of Hadoop

hive> select key, count(1) from kv1 where key > 100 group by key;

vs.

$ cat > /tmp/reducer.sh

uniq -c | awk '{print $2"\t"$1}’

$ cat > /tmp/map.sh

awk -F '\001' '{if($1 > 100) print $1}’

$ bin/hadoop jar contrib/hadoop-0.23-dev-streaming.jar -input / user/hive/warehouse/kv1 mapper map.sh file /tmp/reducer.sh file /tmp/map.sh reducer reducer.sh output /tmp/largekey -numReduceTasks 1

(31)
(32)

Hive Data/Query Model

§

Looks and behaves almost like a regular database

§

Data Model

§  Tables with typed columns

§  Flexible types and storage formats

§

Query Model

§  Flavor of SQL for analytics queries

(33)

Data Model

Hive Entity Sample

Metastore Entity

Sample HDFS Location

Table

T

/wh/T

Partition

date=d1

/wh/T/date=d1

Bucketing

column

userid

/wh/T/date=d1/part-0000

/wh/T/date=d1/part-1000

(hashed on userid)

External

(34)

Data Model

§

Tables

–  Analogous to tables in relational DBs

–  Each table has corresponding directory in HDFS –  Example

§  Page views table name: pvs §  HDFS directory

(35)

Data Model

§

Partitions

–  Analogous to dense indexes on partition columns

–  Nested sub-directories in HDFS for each combination of partition column values

–  Example

§  Partition columns: ds, ctry

§  HDFS subdirectory for ds = 20090801, ctry = US

– /wh/pvs/ds=20090801/ctry=US

§  HDFS subdirectory for ds = 20090801, ctry = CA

(36)

Data Model

§

Buckets

–  Split data based on hash of a column - mainly for parallelism –  One HDFS file per bucket within partition sub-directory

–  Example

§  Bucket column: user into 32 buckets §  HDFS file for user hash 0

– /wh/pvs/ds=20090801/ctry=US/part-00000

§  HDFS file for user hash bucket 20

(37)

Data Model

§

External Tables

–  Point to existing data directories in HDFS

–  Can create tables and partitions – partition columns just become annotations to external directories

–  Example: create external table with partitions

CREATE EXTERNAL TABLE pvs(userid int, pageid int, ds string, ctry string) PARTITIONED ON (ds string, ctry string)

STORED AS textfile

LOCATION ‘/path/to/existing/table’

–  Example: add a partition to external table

ALTER TABLE pvs

ADD PARTITION (ds=‘20090801’, ctry=‘US’) LOCATION ‘/path/to/existing/partition’

(38)

Example Application

§

Status updates table:

–  status_updates(userid int, status string, ds string)

§

Load the data from log files:

–  LOAD DATA LOCAL INPATH ‘/logs/status_updates’ INTO TABLE status_updates PARTITION (ds=’2009-03-20’)

§

User profile table

(39)

Example Query Plan (Filter)

§

Filter status updates containing ‘michael

jackson’

–  SELECT * FROM status_updates WHERE status LIKE ‘michael jackson’

(40)

Example Query Plan (Aggregation)

§

Figure out total number

of status_updates in a

given day

–  SELECT COUNT(1) FROM status_updates WHERE ds = ’2009-08-01’

(41)

Hive Query Language

§

Extensibility

–  Pluggable Map-reduce scripts

–  Pluggable User Defined Functions –  Pluggable User Defined Types

§  Complex object types: List of Maps

–  Pluggable Data Formats

§  Apache Log Format

(42)

Hive Evolution

§

Originally:

–  a way for Hadoop users to express queries in a high-level language without having to write map/reduce programs

§

Now more and more:

–  A parallel SQL DBMS which happens to use Hadoop for its storage and execution architecture

–  Nearly 100% of hadoop jobs in the warehouse go through Hive. –  TRANSFORM scripts (any language)

§  Serialization+IPC overhead

–  Pre/Post Hooks (Java)

§  Statement validation/execution

(43)

Hive is an open system

§

Different on-disk data formats

–  Text File, Sequence File, …

§

Different in-memory data formats

–  Java Integer/String, Hadoop IntWritable/Text …

§

User-provided map/reduce scripts

–  In any language, use stdin/stdout to transfer data …

§

User-defined Functions

–  Substr, Trim, From_unixtime …

§

User-defined Aggregation Functions

(44)

File Format Example

§

CREATE TABLE mylog (

user_id BIGINT,

page_url STRING,

unix_time INT)

STORED AS TEXTFILE

;

§

LOAD DATA INPATH '/user/myname/log.txt' INTO

(45)

Existing File Formats

TEXTFILE SEQUENCEFILE RCFILE

Data type text only text/binary text/binary Internal

Storage order Row-based Row-based Column-based Compression File-based Block-based Block-based

Splitable* YES YES YES

Splitable* after

compression NO YES YES

*

Splitable: Capable of splitting the file so that a single huge

file can be processed by multiple mappers in parallel.

(46)

SerDe Examples

§  CREATE TABLE mylog ( user_id BIGINT,

page_url STRING, unix_time INT)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

§  CREATE table mylog_rc ( user_id BIGINT,

page_url STRING, unix_time INT)

ROW FORMAT SERDE

'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'

(47)

SerDe

§

SerDe is short for serialization/deserialization. It controls the

format of a row.

§

Serialized format:

–  Delimited format (tab, comma, ctrl-a …) –  Thrift Protocols

§

Deserialized (in-memory) format:

–  Java Integer/String/ArrayList/HashMap –  Hadoop Writable classes

(48)

Map/Reduce Scripts Examples

§  add file page_url_to_id.py;

§  add file my_python_session_cutter.py;

§  FROM

(SELECT TRANSFORM(user_id, page_url, unix_time) USING 'page_url_to_id.py'

AS (user_id, page_id, unix_time)

FROM mylog

DISTRIBUTE BY user_id

SORT BY user_id, unix_time) mylog2

SELECT TRANSFORM(user_id, page_id, unix_time) USING 'my_python_session_cutter.py'

(49)

Comparison of UDF/UDAF v.s. M/R scripts

UDF/UDAF M/R scripts

language Java any language

data format in-memory objects serialized streams 1/1 input/output supported via UDF supported n/1 input/output supported via UDAF supported 1/n input/output supported via UDTF supported

(50)

Common Join

Task  A   Mapper   Mapper   Table  X   Mapper   …   …   Mapper   Mapper  …   Mapper   Reducer   Table  Y   Shuffle  

(51)

Join in Map Reduce

key value 111 <1,1> 111 <1,2> 222 <1,1> pageid userid time

1 111 9:08:01

2 111 9:08:13

1 222 9:08:14

userid age gender

111 25 female 222 32 male page_view user pv_users key value 111 <2,25> 222 <2,32> Map key value 111 <1,1> 111 <1,2> 111 <2,25> key value 222 <1,1> 222 <2,32>

Shuffle

Sort

Pageid age 1 25 2 25 pageid age 1 32 Reduce

(52)
(53)
(54)
(55)
(56)
(57)

Hive but is not Enough!

§

Workflow specification, schedule and execution framework

§  Workflows are DAGS

§  Nodes are data transfers and transformations §  Edges are dependencies between nodes

§

Reporting and Dashboard Tools

§

HiveQuery/Workflow Authoring Tools

§

Warehouse management

§  Track space and cpu usage of the cluster §  Capacity planning for growth

(58)
(59)

Warehouse Challenges

§

Growth

(60)

Growth Numbers

Facebook Users (million) Queries/ Day Scribe Data GB/ Day

Nodes Size TB (Total)

March 2008

March 2012

(61)

HDFS – Normal Deployment

NameNode Data Node 1 Data Node 2 Data Node 3

(62)

First Attempts

§

Concatenate old tables/partitions

–  Alter table partition <p> concatenate

–  No need to compress/uncompress the data for RCFile

§

Hadoop Archive File

–  Needed for bucketed files

(63)

HDFS – ‘Hacked’ Federation

NN1 NN2

(64)

HDFS - Federated Deployment

NameNode1 NameNode2 Data Node 1 Data Node 2 Data Node 3

(65)

HDFS Layout

NEW  

Map  Reduce  

(66)

Corona

Job  Tracker   Task  Tracker   Task  Tracker   Task  Tracker   Hive  CLI  +   Job  Client   heartbeat   M   R   Hive Query

(67)

Hadoop Corona

§

Split the current Job Tracker

–  Cluster Manager to manage resources/nodes –  One Corona Job Tracker per job

–  Corona Job Tracker requests resources from Cluster Manager

§

Small amount of state in Cluster Manager

(68)

Corona

Cluster  

Manager   Task  Tracker   Task  Tracker   Task  Tracker   Hive  CLI  +   Job  Client  +   Job  Tracker   heartbeat   M   R   Hive Query

(69)

Warehouse Challenges

§

Growth

§

Isolation

–  Space Isolation

–  Compute Isolation

–  Failure Isolation

(70)

Isolation - Now

§

Hardware isolation

–  Platinum cluster & Silver cluster

§

Partial compute isolation

–  Pools

Pool1   HDFS  Cluster   Pool3   Pool2  

(71)

Challenges: Isolation

Pla3num  

Replica?on  

(72)

Isolation

§

Pools

–  FIFO within each pool

(73)

TEAM Minimum Slots ADS BI COEFFICIENT GROWTH SCRAPING INSIGHTS NETEGO PLATFORM

(74)

Isolation - Future

§

Logical namespace per team

§

Namespace encompasses

–  Transport capacity (scribe)

–  Realtime analytics capacity (puma)

–  Storage capacity (hive tables)

–  Compute capacity (periodic/adhoc analyses)

§

Resource accountability per

namespace

§

Pools computed dynamically

(75)

Isolation - Future

NEW   Pool1   HDFS  Cluster   Pool3   Pool2   NS3   NS2   NS1  

(76)

Challenges: Testing

SILVER   BRONZE  

DFS1  

DFS5   DFSTEMP  

(77)

Testing

§

Snapshot cluster

–  Queries for a day

(78)

Warehouse Challenges

§

Growth

§

Isolation

§

Multiple Regions

–  Hadoop picky about new capacity requirements

–  Need to use any capacity in any location

(79)

Multi Region

HDFS   Hive1  

Replica?on   Map  Reduce  

(80)

Project Prism

NS1   NS2   NS3  

Pool1   Pool2   Pool3  

HDFS  Cluster     Hive1   Central   Namespace     Server   Replica?on   Replica?on   Replica?on  

(81)
(82)

Peregrine

§

Fast

§

Approximate results

§

Memory bound

(83)

Open Source

§

Hadoop

– Facebook has its internal branch

– Releases to github periodically

§

Hive

– Development is in apache

(84)

Hive Open Projects

§

Testing

– Benchmark

(85)

Hive Open Projects

§

Performance

– Materialized Views

– Cost-based optimizer for Hive

– Index Joins

– Better skew handling techniques

– Map-reduce-reduce-reduce*

(86)

References

Related documents

In this paper, we propose a mobility pattern based location tracking scheme based, which efficiently reduces the location updates and searching cost in the

The manager for an internal project just starting up would like to borrow one of your programmers, Ashok. Ashok is currently on a 5-person team for the GoldDigger data mining project

The liabilities of the abolished unit shall be treated in accordance with the Government Auditing Code and other pertinent laws, rules and regulations, while the personnel

It is also important to note Somali women reported experiences of positive aspects of childbirth, for example women reported an appreciation for care received, support from

Each exhibiting company will receive a complimentary delegate registration to invite a non-BIBA member broker to the event as their guest.. This will save non-BIBA member

Based on the conducted case studies, seven key characteristics of rural construction SMEs were identified: (1) motivation for survival, (2) no policy for the implementation of

– SSH login  send packets with the destination IP and port number 22..

If the swimmer heads directly across a river that has a current moving at the rate of 2 miles per hour, what is the actual speed of the swimmer.. If the river is 1 mile wide, how far