Data Warehouse Overview
Agenda
•
Why data ?
•
Life of a tag for data infrastructure
•
Warehouse architecture
•
Challenges
Data Science –
peace.facebook.com
Data Science -
facebook.com/data
‘Data-enhanced’ Products
§
People You May Know (PYMK)
§
Newsfeed ranking
§
Ads optimization
Internal Reporting – Product Insights
§
Data-driven product development
§
Allows products to iterate quickly by observing user
Life of a tag
Facebook Architecture – Data Sources
•
Log Data (facts)
• Web-tier – user activity logs
• View/click of an ad, liking a story, fanning a page, status update, …
• Backend Services - Search,
Newsfeed, Ads
•
Facebook-Site related Data
(dimensions)
• MySQL
• Descriptions of ads • User demographics
www.facebook.com
User tags a photo
Log line generated: <user_id, photo_id>
Scribe Log Storage
Log line reaches Scribeh (1s)
copier/loader
Warehouse
Log line reaches warehouse (1hr)
MySql Scrapes
User info reaches Warehouse (1day)
nocron Periodic Analysis
Daily report on count of photo tags by country (1day)
hipal
Adhoc Analysis
Count photos tagged by females age 20-25 yesterday
Life of a tag for data infrastructure
Count users tagging photos in the last hour (1min)
Takeaways
• Log collection
• Realtime analysis
• Batch analysis
• Periodic analysis
• Interactive analysis
Takeaways
• Scribe/Calligraphus
• Puma/HBase
• Hive/Hadoop
Takeaways
• Open Source
• Scribe
• HBase
§
Open Source, simple and scalable log collection system
Scribe
Challenges: Choosing the right stack ?
Hadoop/
Hive AsterData Oracle/ Sharded MySQL
Cost
✔
✔
Availability✔
Scalability✔
Performance✔
✔
ACID✔
✔
Ease of Use ✔✔
Warehouse Architecture
Warehouse Architecture
Storage (HDFS)
Warehouse Architecture
Storage (HDFS)
Compute (MapReduce)
Warehouse Architecture
Storage (HDFS)
Compute (MapReduce)
Hadoop
Warehouse Architecture
Storage (HDFS)
Compute (MapReduce)
Hadoop
Query (Hive)
Workflow (Nocron)
What is
§ Hadoop: Open Source Apache project
– Framework for running applications on large clusters of commodity hardware § Scale: petabytes of data on thousands of nodes
– Hadoop layers:
§ Storage layer: HDFS
§ Processing layer: MapReduce
§ Characteristics:
– Uses clusters of commodity computers
– Supports moving computation close to data
§ Single ‘storage + compute’ cluster vs. Separate clusters
§ Scalable, fault tolerant, and easily managed
25
HDFS Data Model
§
Data is logically organized into files and directories
§
Files are divided into uniform-sized blocks
§
Blocks are distributed across the nodes of the cluster and
are replicated to handle hardware failure
§
HDFS keeps checksums of data for corruption detection and
recovery
§
HDFS exposes block placement so that computation can be
26
HDFS Architecture
Read
Metadata ops
Client
Metadata (Name, #replicas, …): /users/foo/data, 3, … Namenode Client Datanodes Rack 1 Rack 2 Replication Block ops Datanodes Write Blocks Metadata ops
Hive
§
Aim to simplify usage of Hadoop
§
A system for managing and querying structured and
semi-structured data built on top of Hadoop
§ Map-Reduce for execution § HDFS for storage
§ Metadata on HDFS files
§
Key Building Principles
§ SQL is a familiar language
§ Extensibility – Types, Functions, Formats, Scripts § Performance
Hive – Simplifying usage of Hadoop
hive> select key, count(1) from kv1 where key > 100 group by key;
vs.
$ cat > /tmp/reducer.sh
uniq -c | awk '{print $2"\t"$1}’
$ cat > /tmp/map.sh
awk -F '\001' '{if($1 > 100) print $1}’
$ bin/hadoop jar contrib/hadoop-0.23-dev-streaming.jar -input / user/hive/warehouse/kv1 mapper map.sh file /tmp/reducer.sh file /tmp/map.sh reducer reducer.sh output /tmp/largekey -numReduceTasks 1
Hive Data/Query Model
§
Looks and behaves almost like a regular database
§
Data Model
§ Tables with typed columns
§ Flexible types and storage formats
§
Query Model
§ Flavor of SQL for analytics queries
Data Model
Hive Entity Sample
Metastore Entity
Sample HDFS Location
Table
T
/wh/T
Partition
date=d1
/wh/T/date=d1
Bucketing
column
userid
/wh/T/date=d1/part-0000
…
/wh/T/date=d1/part-1000
(hashed on userid)
External
Data Model
§
Tables
– Analogous to tables in relational DBs
– Each table has corresponding directory in HDFS – Example
§ Page views table name: pvs § HDFS directory
Data Model
§
Partitions
– Analogous to dense indexes on partition columns
– Nested sub-directories in HDFS for each combination of partition column values
– Example
§ Partition columns: ds, ctry
§ HDFS subdirectory for ds = 20090801, ctry = US
– /wh/pvs/ds=20090801/ctry=US
§ HDFS subdirectory for ds = 20090801, ctry = CA
Data Model
§
Buckets
– Split data based on hash of a column - mainly for parallelism – One HDFS file per bucket within partition sub-directory
– Example
§ Bucket column: user into 32 buckets § HDFS file for user hash 0
– /wh/pvs/ds=20090801/ctry=US/part-00000
§ HDFS file for user hash bucket 20
Data Model
§
External Tables
– Point to existing data directories in HDFS
– Can create tables and partitions – partition columns just become annotations to external directories
– Example: create external table with partitions
CREATE EXTERNAL TABLE pvs(userid int, pageid int, ds string, ctry string) PARTITIONED ON (ds string, ctry string)
STORED AS textfile
LOCATION ‘/path/to/existing/table’
– Example: add a partition to external table
ALTER TABLE pvs
ADD PARTITION (ds=‘20090801’, ctry=‘US’) LOCATION ‘/path/to/existing/partition’
Example Application
§
Status updates table:
– status_updates(userid int, status string, ds string)
§
Load the data from log files:
– LOAD DATA LOCAL INPATH ‘/logs/status_updates’ INTO TABLE status_updates PARTITION (ds=’2009-03-20’)
§
User profile table
Example Query Plan (Filter)
§
Filter status updates containing ‘michael
jackson’
– SELECT * FROM status_updates WHERE status LIKE ‘michael jackson’
Example Query Plan (Aggregation)
§
Figure out total number
of status_updates in a
given day
– SELECT COUNT(1) FROM status_updates WHERE ds = ’2009-08-01’Hive Query Language
§
Extensibility
– Pluggable Map-reduce scripts
– Pluggable User Defined Functions – Pluggable User Defined Types
§ Complex object types: List of Maps
– Pluggable Data Formats
§ Apache Log Format
Hive Evolution
§
Originally:
– a way for Hadoop users to express queries in a high-level language without having to write map/reduce programs
§
Now more and more:
– A parallel SQL DBMS which happens to use Hadoop for its storage and execution architecture
– Nearly 100% of hadoop jobs in the warehouse go through Hive. – TRANSFORM scripts (any language)
§ Serialization+IPC overhead
– Pre/Post Hooks (Java)
§ Statement validation/execution
Hive is an open system
§
Different on-disk data formats
– Text File, Sequence File, …
§
Different in-memory data formats
– Java Integer/String, Hadoop IntWritable/Text …
§
User-provided map/reduce scripts
– In any language, use stdin/stdout to transfer data …
§
User-defined Functions
– Substr, Trim, From_unixtime …
§
User-defined Aggregation Functions
File Format Example
§
CREATE TABLE mylog (
user_id BIGINT,
page_url STRING,
unix_time INT)
STORED AS TEXTFILE
;
§
LOAD DATA INPATH '/user/myname/log.txt' INTO
Existing File Formats
TEXTFILE SEQUENCEFILE RCFILE
Data type text only text/binary text/binary Internal
Storage order Row-based Row-based Column-based Compression File-based Block-based Block-based
Splitable* YES YES YES
Splitable* after
compression NO YES YES
*
Splitable: Capable of splitting the file so that a single huge
file can be processed by multiple mappers in parallel.
SerDe Examples
§ CREATE TABLE mylog ( user_id BIGINT,
page_url STRING, unix_time INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
§ CREATE table mylog_rc ( user_id BIGINT,
page_url STRING, unix_time INT)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
SerDe
§
SerDe is short for serialization/deserialization. It controls the
format of a row.
§
Serialized format:
– Delimited format (tab, comma, ctrl-a …) – Thrift Protocols
§
Deserialized (in-memory) format:
– Java Integer/String/ArrayList/HashMap – Hadoop Writable classes
Map/Reduce Scripts Examples
§ add file page_url_to_id.py;
§ add file my_python_session_cutter.py;
§ FROM
(SELECT TRANSFORM(user_id, page_url, unix_time) USING 'page_url_to_id.py'
AS (user_id, page_id, unix_time)
FROM mylog
DISTRIBUTE BY user_id
SORT BY user_id, unix_time) mylog2
SELECT TRANSFORM(user_id, page_id, unix_time) USING 'my_python_session_cutter.py'
Comparison of UDF/UDAF v.s. M/R scripts
UDF/UDAF M/R scripts
language Java any language
data format in-memory objects serialized streams 1/1 input/output supported via UDF supported n/1 input/output supported via UDAF supported 1/n input/output supported via UDTF supported
Common Join
Task A Mapper Mapper Table X Mapper … … Mapper Mapper … Mapper Reducer Table Y ShuffleJoin in Map Reduce
key value 111 <1,1> 111 <1,2> 222 <1,1> pageid userid time
1 111 9:08:01
2 111 9:08:13
1 222 9:08:14
userid age gender
111 25 female 222 32 male page_view user pv_users key value 111 <2,25> 222 <2,32> Map key value 111 <1,1> 111 <1,2> 111 <2,25> key value 222 <1,1> 222 <2,32>
Shuffle
Sort
Pageid age 1 25 2 25 pageid age 1 32 ReduceHive but is not Enough!
§
Workflow specification, schedule and execution framework
§ Workflows are DAGS
§ Nodes are data transfers and transformations § Edges are dependencies between nodes
§
Reporting and Dashboard Tools
§
HiveQuery/Workflow Authoring Tools
§
Warehouse management
§ Track space and cpu usage of the cluster § Capacity planning for growth
Warehouse Challenges
§
Growth
Growth Numbers
Facebook Users (million) Queries/ Day Scribe Data GB/ DayNodes Size TB (Total)
March 2008
March 2012
HDFS – Normal Deployment
NameNode Data Node 1 Data Node 2 Data Node 3First Attempts
§
Concatenate old tables/partitions
– Alter table partition <p> concatenate
– No need to compress/uncompress the data for RCFile
§
Hadoop Archive File
– Needed for bucketed files
HDFS – ‘Hacked’ Federation
NN1 NN2
HDFS - Federated Deployment
NameNode1 NameNode2 Data Node 1 Data Node 2 Data Node 3HDFS Layout
NEW
Map Reduce
Corona
Job Tracker Task Tracker Task Tracker Task Tracker Hive CLI + Job Client heartbeat M R Hive QueryHadoop Corona
§
Split the current Job Tracker
– Cluster Manager to manage resources/nodes – One Corona Job Tracker per job
– Corona Job Tracker requests resources from Cluster Manager
§
Small amount of state in Cluster Manager
Corona
Cluster
Manager Task Tracker Task Tracker Task Tracker Hive CLI + Job Client + Job Tracker heartbeat M R Hive Query
Warehouse Challenges
§
Growth
§
Isolation
– Space Isolation
– Compute Isolation
– Failure Isolation
Isolation - Now
§
Hardware isolation
– Platinum cluster & Silver cluster
§
Partial compute isolation
– Pools
Pool1 HDFS Cluster Pool3 Pool2Challenges: Isolation
Pla3num
Replica?on
Isolation
§
Pools
– FIFO within each pool
TEAM Minimum Slots ADS BI COEFFICIENT GROWTH SCRAPING INSIGHTS NETEGO PLATFORM
Isolation - Future
§
Logical namespace per team
§
Namespace encompasses
– Transport capacity (scribe)
– Realtime analytics capacity (puma)
– Storage capacity (hive tables)
– Compute capacity (periodic/adhoc analyses)
§
Resource accountability per
namespace
§
Pools computed dynamically
Isolation - Future
NEW Pool1 HDFS Cluster Pool3 Pool2 NS3 NS2 NS1Challenges: Testing
SILVER BRONZE
DFS1
DFS5 DFSTEMP
Testing
§
Snapshot cluster
– Queries for a day
Warehouse Challenges
§
Growth
§
Isolation
§
Multiple Regions
– Hadoop picky about new capacity requirements
– Need to use any capacity in any location
Multi Region
HDFS Hive1
Replica?on Map Reduce
Project Prism
NS1 NS2 NS3
Pool1 Pool2 Pool3
HDFS Cluster Hive1 Central Namespace Server Replica?on Replica?on Replica?on