Making Sense at Scale with the
Berkeley Data Analytics Stack
UC BERKELEY
Michael Franklin February 3, 2015 WSDM 2015 Shanghai
Agenda
• A Little Bit on Big Data
• AMPLab Overview
• BDAS Philosophy: Unification not Specialization
• Spark, GraphX, and other BDAS components
Sources Driving Big Data
It’s All Happening On-‐line Every:
Click
Ad impression Billing event
Fast Forward, pause,… Friend Request
Transaction
Network message Fault
…
User Generated (Web & Mobile)
… ..
Big Data – A Bad Definition
Data sets, typically consisting of billions or trillions
of records, that are so vast and complex that they require new and powerful computational
resources to process.
- Dictionary.com
Big Data as a Resource
“For a long time, we thought that Tamoxifen was roughly 80% effective for breast cancer patients. But now we know much more:
we know that it’s 100% effective in 70% to 80% of the patients, and ineffective in the rest.”
- Tim O’Reilly et al. “How Data Science is Transforming Health Care”
With enough of the right data you can determine precisely who the treatment will work for.
A Big Data Pattern
6
750,000+ downloads
AMPLab: Integrating 3 Resources
Algorithms
• Machine Learning, Statistical Methods
• Prediction, Business Intelligence
Machines
• Clusters and Clouds
• Warehouse Scale Computing
People
• Crowdsourcing, Human Computation
AMPLab Overview
UC BERKELEY
• 70+ Students, Postdocs, Faculty and Staff from:
Databases, Machine Learning, Systems, Security, and Networking
• 50/50 Split: Industry Sponsors and Govt
White House Big Data Program:
NSF CISE Expeditions in Computing and Darpa XData • Fixed Timeline (ends Dec 2016); Collaborative Working Space
See Dave Patterson “How to Build a Bad Research Center”, CACM March, 2014 “… Berkeley’s AMPLab has already left an indelible mark on world of
information technology, and even the web. But we haven’t yet experienced the full impact of the group … Not even close.”
– Derrick Harris, GigaOM, Aug 2, 2014
A Nexus of Industrial Engagement
• Open Source Software – Industrial-Strength
Open Source Community Building
MeetUp on MLBase @Twitter (Aug 6, 2013)
Big Data Ecosystem Evolution
MapReduce Pregel Dremel GraphLab Storm Giraph Drill Tez Impala S4 … Specialized systems(iterative, interactive and"
streaming apps)
General batch"
AMPLab Unification Philosophy
Don’t specialize MapReduce – Generalize it!
Two additions to Hadoop MR can enable all the models shown earlier!
1. General Task DAGs 2. Data Sharing
For Users:
Fewer Systems to Use
Less Data Movement Spark
Streaming GraphX … SparkSQL MLbase
B
erkeley
D
ata
A
nalytics
S
tack
(Apache and BSD open source)
Resource Virtualization Storage Processing Engine Access and Interfaces In-house Apps
In-Memory
Dataflow
System
M. Zaharia, M. Choudhury, M. Franklin, I. Stoica, S. Shenker, “Spark: Cluster Computing with Working Sets, USENIX HotCloud, 2010.
“It’s only September but it’s already clear that 2014 will be the year of Apache Spark”
-- Datanami, 9/15/14
• Developed in AMPLab and its predecessor the RADLab
• Alternative to Hadoop MapReduce
• 10-100x speedup for ML and interactive queries
• Central component of the BDAS Stack
Apache Spark Contributors:
0 25 50 75 100 2011 2012 2013 2014Apache Spark:
Compared to Other Projects
M ap R ed uce YAR N HDFS St or m Spar k 0 200 400 600 800 1000 1200 1400 1600 1800 2000 M ap R ed uce YAR N HD FS St or m Spar k 0 50000 100000 150000 200000 250000 300000 350000
Commits Lines of Code Changed
Activity in past 6 months
2-3x more activity than: Hadoop, Storm, MongoDB, NumPy, D3, Julia, …
Iteration in Map-Reduce
Training
Data
Map Reduce Learned Model w(1) w(2) w(3) w(0) Initial Model
Cost of Iteration in Map-Reduce
Map Reduce LearnedModel w(1) w(2) w(3) w(0) Initial Model Training Data Read 2
Repeatedly
Cost of Iteration in Map-Reduce
Map Reduce LearnedModel w(1) w(2) w(3) w(0) Initial Model Training
Data
Redundantly
save
output between
Dataflow View
Training Data (HDFS) Map R ed uce Map R ed uce Map R ed uceMemory Opt. Dataflow
Training Data (HDFS) Map R ed uce Map R ed uce Map R ed uce Cached LoadMemory Opt. Dataflow View
Training Data (HDFS) Map R ed uce Map R ed uce Map R ed uce Efficiently move data between stagesResilient Distributed Datasets (RDDs)
API: coarse-grained transformations (map, group-by, join, sort, filter, sample,…) on immutable collections Resilient Distributed Datasets (RDDs)
» Collections of objects that can be stored in memory or
disk across a cluster
» Built via parallel transformations (map, filter, …)
» Automatically rebuilt on failure
Rich enough to capture many models:
» Data flow models: MapReduce, Dryad, SQL, …
» Specialized models: Pregel, Hama, …
M. Zaharia, et al, Resilient Distributed Datasets: A fault-tolerant abstraction for in-memory cluster computing, NSDI 2012.
Abstraction:
Dataflow Operators
map filter groupBy sort union join leftOuterJoin rightOuterJoin reduce count fold reduceByKey groupByKey cogroup cross zip sample take first partitionBy mapWith pipe save ...Fault Tolerance with RDDs
RDDs track the series of transformations used to build them (their lineage)
» Log one operation to apply to many elements
» No cost if nothing fails
Enables per-node recomputation of lost data
messages = textFile(...).filter(_.contains(“error”)) .map(_.split(‘\t’)(2))
HadoopRDD
A Unified System for SQL & ML
def logRegress(points: RDD[Point]): Vector { var w = Vector(D, _ => 2 * rand.nextDouble - 1) for (i <- 1 to ITERATIONS) { val gradient = points.map { p => val denom = 1 + exp(-p.y * (w dot p.x)) (1 / denom - 1) * p.y * p.x }.reduce(_ + _) w -= gradient } w }
val users = sql2rdd("SELECT * FROM user u JOIN comment c ON c.uid=u.uid")
val features = users.mapRows { row => new Vector(extractFeature1(row.getInt("age")), extractFeature2(row.getStr("country")), ...)}
val trainedVector = logRegress(features.cache())
Deep integration of SQL and Spark
Both share the same set of workers and caches
Can move seamlessly between SQL and Machine Learning worlds
R. Xin, J. Rosen, M. Zaharia, M. Franklin,S. Shenker, I. Stoica, “Shark: SQL and Rich Analytics at Scale, SIGMOD 2013.
Spark Streaming
Microbatch approach provides lower latency
Additional operators provide windowed operations
M. Zaharia, et al, Discretized Streams: Fault-Tollerant Streaming Computation at Scale, SOSP 2013.
Batch/Streaming Unification
Batch and streaming codes virtually the same
» Easy to develop and maintain consistency
// count words from a file (batch)
val file = sc.textFile("hdfs://.../pagecounts-*.gz") val words = file.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print()
// count words from a network stream, every 10s (streaming)
val ssc = new StreamingContext(args(0), "NetCount", Seconds(10), ..) val lines = ssc.socketTextStream("localhost”, 3456)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print()
Apache Spark v1.2
(12/14)
Includes
» Spark (core)
» Spark Streaming
» GraphX (alpha release)
» MLlib
» SparkSQL – Query
Processing
Wide range of interfaces:
» Python / interactive ipython
» Scala / interactive scala shell
» R / interactive R-shell
» Java
Graph Analytics
Raw Wikipedia < / >< / > < / > XMLHyperlinks PageRank Top 20 Pages
Title PR Link Table Title Link Editor Graph Community Detection User Community User Com. Discussion Table User Disc. Top Communities Com. PR..
Separate
Systems
Separate
Systems
Graphs
Dataflow Systems
Table Result Row Row Row RowSeparate
Systems
Dataflow Systems
Graph Systems
Dependency Graph 6. Before 8. After 7. After Table Result Row Row Row Row
Difficult to Use
Users must
Learn
,
Deploy
, and
Manage
multiple systems
Leads to brittle and often
complex interfaces
22 354 1340 0 200 400 600 800 1000 1200 1400 1600 GraphLab Spark Hadoop
Runtime (in seconds, PageRank for 10 iterations) Live-Journal Graph
Efficiencies are Possible
Specialized Graph System can be faster than
general MR computation
But
36
Extensive data movement and duplication across
the network and file system
< / >< / > < / > XML
HDFS HDFS HDFS HDFS
Limited reuse internal data-structures
GraphX – Adding Graphs to the Mix
GraphX Unified
Representation Graph View
Table View
Tables and Graphs are composable views of the same
physical data
Each view has its own operators that exploit the
semantics of the view to achieve efficient execution
J. Gonzalez, R. Xin, A. Dave, D. Crankshaw, M. Franklin, I. Stoica, “GraphX: Graph Processing in a Distributed Dataflow Framework“ OSDI Conf., Oct 2014
Representation
Optimizations
Distributed Graphs Horizontally Partitioned Tables Join Vertex Programs Dataflow OperatorsAdvances in Graph Processing Systems
Distributed Join Optimization Materialized View
Property Graph Data Model
B C A D F E A DD Property Graph B C D E A A FVertex Property:
• User Profile• Current PageRank Value
Edge Property:
• Weights
Part. 2 Part. 1 Vertex Table (RDD) B C A D F E A D
Encoding Property Graphs as Tables
D Property Graph B C D E A A F M ach in e 1 M ach in e 2 Edge Table (RDD) A B A C C D B C A E A F E F E D B C D E A F Routing Table (RDD) B C D E A F 1 2 1 2 1 2 1 2 Vertex Cut
class Graph [ V, E ] {
def Graph(vertices: Table[ (Id, V) ],
edges: Table[ (Id, Id, E) ])
// Table Views
def vertices: Table[ (Id, V) ] def edges: Table[ (Id, Id, E) ]
def triplets: Table [ ((Id, V), (Id, V), E) ]
// Transformations
def reverse: Graph[V, E]
def subgraph(pV: (Id, V) => Boolean,
pE: Edge[V,E] => Boolean): Graph[V,E]
def mapV(m: (Id, V) => T ): Graph[T,E] def mapE(m: Edge[V,E] => T ): Graph[V,T]
// Joins
def joinV(tbl: Table [(Id, T)]): Graph[(V, T), E ] def joinE(tbl: Table [(Id, Id, T)]): Graph[V, (E, T)]
// Computation ---
def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)],
reduceF: (T, T) => T): Graph[T, E] }
Graph Operators (Scala)
class Graph [ V, E ] {
def Graph(vertices: Table[ (Id, V) ],
edges: Table[ (Id, Id, E) ])
// Table Views
def vertices: Table[ (Id, V) ] def edges: Table[ (Id, Id, E) ]
def triplets: Table [ ((Id, V), (Id, V), E) ]
// Transformations
def reverse: Graph[V, E]
def subgraph(pV: (Id, V) => Boolean,
pE: Edge[V,E] => Boolean): Graph[V,E]
def mapV(m: (Id, V) => T ): Graph[T,E] def mapE(m: Edge[V,E] => T ): Graph[V,T]
// Joins
def joinV(tbl: Table [(Id, T)]): Graph[(V, T), E ] def joinE(tbl: Table [(Id, Id, T)]): Graph[V, (E, T)]
// Computation ---
def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)],
reduceF: (T, T) => T): Graph[T, E] }
Graph Operators (Scala)
42
def mrTriplets(mapF: (Edge[V,E]) => List[(Id, T)],
reduceF: (T, T) => T): Graph[T, E]
capture the
Gather-Scatter pattern
from
GraphLab
Graph System Optimizations
43 Specialized Data-Structures Vertex-Cuts Partitioning Remote Caching / Mirroring0 500 1000 1500 2000 2500 3000 3500 0 1000 2000 3000 4000 5000 6000 7000 8000 9000
Twitter Graph (42M Vertices,1.5B Edges) UK-Graph (106M Vertices, 3.7B Edges)
PageRank Benchmark
GraphX performs comparably to state-of-the-art graph processing systems.
R un time (S econ ds)
EC2 Cluster of 16 x m2.4xLarge Nodes + 1GigE
The GraphX Stack
(Lines of Code)
GraphX (2,500) Spark (30,000) GraphLab/Pregel API (34) PageRank (20) Connected Comp. (20) K-core (60) TriangleCount (50) LDA (220) SVD++ (110)Some algorithms are more naturally expressed using the GraphX primitive operators
Graphs are just one stage….
HDFS HDFS
Compute
Spark Preprocess Spark Post.
A Small Pipeline in GraphX
Timed end-to-end GraphX is the fastest
Raw Wikipedia
< / >< / > < / > XML
Hyperlinks PageRank Top 20 Pages
0 200 400 600 800 1000 1200 1400 1600
GraphX GraphLab + Spark Giraph + Spark Spark
Total Runtime (in Seconds) 605
375
1492
MLBase: Distributed ML Made Easy
DB Query Language Analogy:
Specify What not How MLBase chooses:
• Algorithms/Operators
• Ordering and Physical
Placement
• Parameter and
Hyperparameter Settings
• Featurization
Leverages Spark for Speed and Scale
T. Kraska, A. Talwalkar, J. Duchi, R. Griffith, M. Franklin, M. Jordan, “MLBase: A Distributed Machine Learning System”, CIDR 2013.
ML Pipeline Generation & Optimization
MLBase Optimizer Focus:
• Better Resource Utilization
• Algorithmic Speedups
• Reduced Model Search Time Ex: Image Classification Pipeline
E. Sparks, S. Venkataraman, M. Franklin , B. Recht, “ML Pipelines”, In Process. (see AMPLab Blog for an overview)
50
Data
Model
Where do models go?
Conference Papers Sales Reports Drive Actions Training
Driving Actions
51 Suggesting Items at Checkout Fraud Detection Cognitive Assistance Internet of ThingsProblem:
Separate Systems
52Offline Analytics
Systems
6. Before 8. After 7. After Sophisticated ML on static data. Low-Latency data servingHow do we serve low-latency predictions and
train on live data?
Online Serving
Systems
Velox Model Serving System
Decompose personalized predictive models:
53 [CIDR’15]
Velox Model Serving System
Decompose personalized predictive models:
54 [CIDR’15] Split Personalization Model Feature Model Online Batch Feature Caching Approx. Features Online Updates Active Learning
B
erkeley
D
ata
A
nalytics
S
tack
(Apache and BSD open source)
Resource Virtualization Storage Processing Engine Access and Interfaces In-house Apps
Big Data Architecture Open
Questions & Research Issues
• What is the role of a unified stack in a
fast-changing software landscape?
• Single-node vs. Cluster; Elastic Cloud vs HPC
• GPUs, FPGAs, XYZs, ???
• New memory hierarchies: SSDs, RDMA, …
• Serving vs. Analytics Workloads
Summary
• Big Data – yes, there’s hype but it is a Big Deal
• AmpLab project
• 6 Yrs, Cross-disciplinary team, Industry engagement
• Open Source development and community building
• BDAS philosophy: Unification
• Spark + SQL + Graphs + ML + …
• The Big Data landscape continues to evolve
• Open Source enables Academic research to play a
huge role
To find out more or
get involved:
amplab.berkeley.edu
[email protected]
UC BERKELEY
Thanks to NSF CISE Expeditions in Computing, DARPA XData, Founding Sponsors: Amazon Web Services, Google, and SAP,
the Thomas and Stacy Siebel Foundation,