Cloud Computing
Lecture 19
Cloud Programming 2011-2012
Up until now…
• Introduction, Definition of Cloud Computing
• Pre-Cloud Large Scale Computing: • Grid Computing
• Content Distribution Networks
• Cycle-Sharing • Distributed Scheduling • Cloud: • Map Reduce • Storage • Execution • Monitoring
Outline
•
Cloud Programming Models
• Pig
• DryadLINQ
• Percolator
Pig
•
There are large scale data operations that
take too many steps to model in MapReduce.
•
Creating complex workflows becomes
cumbersome.
•
Pig is a high level programming language from
Hadoop for processing massive amounts of
records.
•
Provides common operations like join, group,
filter, sort.
Pig
•
Pig provides an execution engine atop
Hadoop:
• Removes need for users to tune Hadoop for their needs.
• Insulates users from changes in Hadoop interfaces.
•
Pig are written in Pig Latin and converted into
MapReduce processes.
•
In Pig Latin variables are lists of tuples.
Pig Latin Example
-- max_temp.pig: Calculate the yearly maximum temperature
records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records
BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year; max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature); DUMP max_temp;
PigPen: Eclipse plugin for
Graphical Pig
Programming
Pig Latin: Key and
Manipulation Operators
Pig Latin: Map Related
Operators
Pig Latin: Reduce Related
Operators
DryadLINQ
•
High performance programming model from
Microsoft.
•
Query based.
•
Run app locally, run queries in large-scale
infra-structure
Components
•
Windows HPC Server 2008: cluster
management, scheduling.
•
Dryad: distributed execution engine, fault
recovery, distribution, scalability based on
partitioned data sets.
•
LINQ: .NET extensions for querying data
sources; exploits parallelism, uniform data
model.
Image Processing Image Processing Windows HPC Server 2008 Windows HPC Server 2008 HPC Job Scheduler HPC Job Scheduler Dryad DryadLINQ DryadLINQ Machine Learning Machine Learning Graph Analysis Graph Analysis Data Mining Data Mining .NET Applications .NET Applications…
Windows HPC Server 2008 Windows HPC Server 2008 Windows HPC Server 2008 Windows HPC Server 2008 Windows HPC Server 2008 Windows HPC Server 2008DryadLINQ Architecture
Dryad
•
Provides a flexible execution layer:
• Graph based execution.
• Graph (code for the nodes, arcs, data serialization) described in a high-level language.
•
Provides transparent distribution:
• Distributes the code and routes the code.
• Creates processes close to the data.
• Masks cluster and network faults.
Computation Channels (file, fifo, pipe) Inputs Outputs
Example of a Dryad program
Pipes in 2D
LINQ: Language Integrated
Query
•
Declarative extension to C# and VB.NET to
iterate over collections:
• In memory.
• Through data providers.
• Similar to SQL.
•
Very popular:
• Easy to use.
• Reduces the amount of code.
Example
• Before (SQL):SELECT [t0].[ContactId], [t0].[FirstName],
[t0].[LastName], [t0].[DateOfBirth],[t0].[Phone], [t0].[Email], [t0].[State] FROM [Contact] AS [t0] WHERE DATEADD(YEAR, @p0, [t0].[DateOfBirth]) > @p1 ORDER BY [t0].[DateOfBirth] DESC
• Now (LINQ):
var q = from c in db.Contact where
c.DateOfBirth.AddYears(35) > DateTime.Now orderby c.DateOfBirth descending select c;
• But, mainly it simplifies queries against sets of objects, XML, DataSets:
string[] names = {"John", "Peter", "Joe", "Patrick", "Donald", "Eric"};
IEnumerable<string> namesWithFiveCharacters =
from name in names where name.Length < 5 select name;
DryadLINQ: LINQ using
Clusters
•
LINQ declarative programming using clusters.
•
Automatic Parallelization:
• Exploiting multi-node and multi-core parallelism.
•
Integrated with VisualStudio and .NET
•
Dynamic type-checking and automatic
serialization.
Development/Execution
Cycle
Development/Execution
Cycle
• DryadLINQ programs run on the user’s PC: • Programmed and executed locally.
• When there are calls to a PartitionedTable, the query is built (code generation, execution plan, optimization) and the job is submitted to the HPC Server.
• HPC Server allocates resources for the job and schedules the Dryad Job Manager (DJM).
• The DJM schedules the tasks in the nodes of the DAG.
Plan LINQ Query Dryad logs where select Automatic Plan Generation Distributed Execution by Dryad varlogentries = from line in logs where !line.StartsWith("#")
select new LogEntry(line);
DryadLINQ: LINQ+Dryad
The Same Query in
DryadLINQ
PartitionedTable<T>
• Fundamental data structure for DryadLINQ.
• Scalable partitioned container for .NET objects.
• Derives from IQueryable<T> and IEnumerable<T>.
• DryadLINQ operators consume and produce PartitionedTable<T>.
• DryadLINQ generates the code that serializes the application’s objects.
• The storage can be partitioned files, partitioned SQL database tables or the cluster’s file system.
\\HPCMETAHN01\XC\output\520a0fcf\Part.000000 00
Partitioned Files:
Container for
PartitionedTable<T>
0: table piece number 1855000: table piece size HPCMETAHN01: table piece node
Partitioned Files:
Container for
PartitionedTable<T>
Choose non-commented lines from the log.
Choose the logentries created by user jvert.
Group the accesses from user jvert by page and count the occurrences.
Order accessed pages by frequency.
A Typical Query
logentries logentries user user logs logs accesses accesses htmAccesses htmAccesses output outputParallel Execution
of a Dryad DAG
Query Execution Plan
•
The query is separated from the execution
context:
• The necessary code is referenced by the query (data structures and auxiliary algorithms).
• References to local variables are eliminated by partial evaluation.
•
The serialization code and the code for the
nodes is generated automatically.
Example of a DryadLINQ
Execution Plan
List of file to be placed in the cluster List of file to be placed in the cluster Node definition Node definition
XML Representation:
from DryadLINQ to Dryad
MapReduce using DryadLINQ
• Executable • Map+sort+reduce • No policies • Program = map+reduce • Simple • Mature (> 4 years) • Widespread • HadoopDryad vs.
Map-Reduce
• Execution layer • Work = DAG • Policies (plugins) • Program = graph • Complex ( + funcs.) • New (< 2 years) • Growing • Proprietary (Microsoft)Percolator:
Incrementally Indexing
the Web
Duplicate Elimination with
MapReduce
Indexing system is a chain of many MapReduces
Index Refresh with
MapReduce
• Should we index the new document?
o New doc could be a dup of any previously crawled o Requires that we map over entire repository
Indexing System Goals
• What do we want from an ideal indexing system?
• Large repository of documents:
• Upper bound on index size
• Higher-quality index: e.g. more links
• Small delay between crawl and index: "freshness"
• MapReduce indexing system: Days from crawl to index
Incremental Indexing
• Maintain a random-access repository in BigTable.
• Indexes allow avoiding a global scan.
Incremental Indexing on
Bigtable
Checksum Canonical
URL Checksum PageRank IsCanonical?
nyt.com 0xabcdef01 6 no
0xabcdef01 nytimes.com
nytimes.com 0xabcdef01 9
What happens if we process both URLs simultaneously?
yes
Percolator: Incremental
Infrastructure
Adds distributed transactions to Bigtable
(0) Transaction t;
(1) string contents = t.Get(row, "raw", "doc"); (2) Hash h = Hash32(contents);
...
// Potential conflict with concurrent execution (3) t.Set(h, "canonical", "dup_table", row);
...
(4) t.Commit();
BigTable Recap
• BigTable is a sorted (row,column, timestamp) store:
• Data is partitioned into row ranges called tablets.
• Tablets are spread across many machines.
Implementing Distributed
Transactions
•
Provide snapshot isolation semantics.
•
Multi-version protocol (mapped to BigTable).
•
Two-phase commit, coordinated by client.
Transaction Commit
Notifications: tracking work
•Users register "observers" on a column:
• Executed when any row in that column is written.
• Each observer runs in a new transaction.
• Run at most once per write.
Additional BigTable
Columns for Percolator
Implementing
Notifications
•
If “notify” column is set, observer must be
run.
•
Implemented using a randomized distributed
scan:
• Finds pending works, runs observers in thread pool.
Bus Clumping
•Randomized scanners tend to clump:
• Reduces effective parallelism
• Overloads Bigtable servers
•Solution:
• Try to obtain a lightweight scanner lock per row.
• If unsuccessful, jump to a random point in the table.
Running Percolator
•Each machine runs:
• Worker binary linked with observer code.
• Bigtable tablet server
Very Different Access
Patterns
•
Percolator:
•
small, random disk
I/O
•
many RPCs per
phase, per document
•
MapReduce:
•
streaming I/O
•
Many documents per
RPC, per phase
•Infrastructure is much better suited to the MR model. Even though it does "extra" work, it does so very efficiently.
MR v. Percolator:
Performance
MR v. Percolator:
Experience
•Conversion of an MR-based pipeline to Percolator.•Pros:
• Freshness: indexing delay dropped from days to minutes
• Scalability:
o More throughput: Just buy more CPUs o Bigger repository: Only limited by disk space
• Utilization: immune to stragglers
•Cons:
• Need to reason about concurrency
• More expensive per document processed (~2x)
Running 1000 threads on
Linux
•Percolator uses a thread-per-request model:
•Pros:
• Application developers write "straight line" code
• Meaningful stack traces: easy debugging / profiling
• Easy scalability on many-core machines
•Cons:
• Kernel scalability: kernel locks were held while doing work on each of 1000 threads during process exit
Conclusion
•Percolator now building the "Caffeine" websearch index
• 50% fresher results
• 3x larger repository
•Existence proof for distributed transactions at web scale.