Cornell Tech
Spring 2015
Lecture 3
Data Science in the Wild
1 Cornell Tech Spring 2015
Some slides are taken from
J. Leskovec, A. Rajaraman, J. Ullman:
Mining of Massive Datasets, http://www.mmds.org
Data Science and Big Data
Big Data: the data cannot be processed on a single machine
Required in many data science tasks
Tools and techniques based on distributed systems
Examples of Tools
Spring 2015 Cornell Tech
3
Computation
MapReduce (Hadoop)
Spark
Platforms / warehouse
Pig
Hive
Shark
Database systems
CouchDB
MongoDB
Analytics
Mahout
Motivation: Google Example
20+ billion web pages x 20KB = 400+ TB
1 computer reads 30-35 MB/sec from disk
~4 months to read the web
~1,000 hard drives to store the web
Takes even more to do something useful with the data!
Need a distributed system
Cornell Tech
4 Spring 2015
Cluster Architecture
Mem
Disk CPU
Mem
Disk CPU
…
Switch
Each rack contains 16-64 nodes
Mem
Disk CPU
Mem
Disk CPU
…
Switch Switch
1 Gbps between any pair of nodes in a rack
2-10 Gbps backbone between racks
In 2011 it was guestimated that Google had 1M machines, http://bit.ly/Shh0RO
5 Cornell Tech Spring 2015
Important Challenges
Spring 2015 Cornell Tech
7
How to utilize many servers and synchronize
their work
How to easily write distributed programs
Be able to cope with machine failure
Machine failure example:
- 10,000 servers
- Expected lifespan of a server is 3 years
- About 10 machines will crash every day
Important Challenges
Spring 2015 Cornell Tech
8
Utilize many servers and synchronize their work
Be able to cope with machine failure
Suppose there is a machine that synchronizes
the work of the other servers (master)
- How to handle a crash of such machine?
- Who will detect that the machine is not
responding?
- How to avoid having two different masters?
Distributed Data Processing
Spring 2015 Cornell Tech
9
Storage Infrastructure – File System
Google: GFS
Hadoop: HDFS
Programming model
MapReduce
Typical usage pattern
Huge files (100s of GB to TB)
Data is rarely updated in place
Reads and appends are common
Distributed File System
Chunk servers
File is split into contiguous chunks
Typically each chunk is 16-64MB
Each chunk replicated (usually 2x or 3x)
Try to keep replicas in different racks
Master node
a.k.a. Name Node in Hadoop’s HDFS
Stores metadata about where files are stored
Might be replicated
Client library for file access
Talks to master to find chunk servers
Connects directly to chunk servers to access data
Distributed File System
Reliable distributed file system
Data kept in “chunks” spread across machines
Each chunk replicated on different machines
Seamless recovery from disk or machine failure
C0 C1 C2 C5 Chunk server 1
D1 C5
Chunk server 3 C1
C3 C5 Chunk server 2
…
C2 D0
D0
C0 C5
Chunk server N C2 D0
Cornell Tech
11 Spring 2015
MapReduce
Distributed Computing
Spring 2015 Cornell Tech
12
MapReduce Model
Basic operations
Map: produce a list of (key, value) pairs from the input structured as a (key value) pair of a different type
(k1,v1) list (k2, v2)
Reduce: produce a list of values from an input that consists of a key and a list of values associated with that key
(k2, list(v2)) list(v2)
Note: inspired by map/reduce functions in Lisp and other functional programming languages.
Spring 2015 Cornell Tech
13
Data Flow
Example
map(String key, String value) : // key: document name
// value: document contents for each word w in value:
EmitIntermediate(w, “1”);
reduce(String key, Iterator values) : // key: a word
// values: a list of counts int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Spring 2015 Cornell Tech
15
Example: Map Phase
When in the course of human events it
…
It was the best of times and the worst of times…
map
(in,1) (the,1) (of,1) (it,1) (it,1) (was,1) (the,1) (of,1) … (when,1), (course,1) (human,1) (events,1) (best,1) …
inputs tasks (M=3) partitions (intermediate files) (R=2)
This paper evaluates the
suitability of the … map (this,1) (paper,1) (evaluates,1) (suitability,1) … (the,1) (of,1) (the,1) …
Over the past five years,
the authors and many… map (over,1), (past,1) (five,1) (years,1) (authors,1) (many,1) … (the,1), (the,1) (and,1) …
Note: partition function places small words in one partition and large words in another.
Spring 2015 Cornell Tech
16
Example: Reduce Phase
reduce
(in,1) (the,1) (of,1) (it,1) (it,1) (was,1) (the,1) (of,1) …
(the,1) (of,1) (the,1) …
reduce task partition (intermediate files) (R=2)
(the,1), (the,1) (and,1) …
sort
(and, (1)) (in,(1)) (it, (1,1)) (the, (1,1,1,1,1,1)) (of, (1,1,1)) (was,(1))
(and,1) (in,1) (it, 2) (of, 3) (the,6) (was,1) user’s function
Note: only one of the two reduce tasks shown
run-time function
Spring 2015 Cornell Tech
17
More Examples
Count of URL Access Frequency
Reverse Web-Link Graph
Term-Vector per Host
Inverted Index
Distributed Sort
Sample
Example: Language Model
Statistical machine translation:
Need to count number of times every 5-word
sequence occurs in a large corpus of documents
Very easy with MapReduce:
Map:
Extract (5-word sequence, count) from document
Reduce:
Combine the counts
19 Cornell Tech Spring 2015
Distributed Execution Overview
User Program
Worker Worker Master
Worker
Worker
Worker
fork fork fork
assign
map assign
reduce
read
local write
remote read, sort
Output File 0
Output File 1 write
Split 0 Split 1 Split 2 Input Data
Spring 2015 Cornell Tech
20
Intermediate files on local disk
Operation
1.
Splits the input files into M pieces and assign the
pieces to Map processes
2.
The master assign the M map tasks and the R
reduce tasks to idle workers
3.
A map process reads the input split, parses the
key/value pairs and applies the Map function.
Results are buffered in memory
Spring 2015 Cornell Tech
21
Operation
4.
Periodically , write buffer to local disk, partitioned
into R regions (by the partition function). Let the
master know where the partitions are
5.
A reducer receives locations of its M chunks from
the master. It reads them, and sorts all the data it
read
6.
The reducer passes the key and list of values to the
Reduce function and appends the output to the
result
How?
After this, can wake up the client
Execution Environment
No reduce can begin until map is complete
Tasks scheduled based on location of data
If map worker fails any time before reduce finishes, task must be completely rerun
Master must communicate locations of intermediate files
Data store 1 map Data store n
(key 1, values...)
(key 2, values...)
(key 3, values...)
map
(key 1, values...)
(key 2, values...)
(key 3, values...) Input key*value
pairs
Input key*value pairs
== Barrier == : Aggregates intermediate values by output key
reduce reduce reduce
key 1, intermediate
values
key 2, intermediate
values
key 3, intermediate
values
final key 1 values
final key 2 values
final key 3 values
...
Note: figure and text from presentation by Jeff Dean.
Spring 2015 Cornell Tech
23
Worker Failure
When worker A is idle or inaccessible for a long time,
the master assigns the task to another machine
What happens if the two machines complete the task and
report this to the master?
Spring 2015 Cornell Tech
24
Master Failure
In principle, can be handled by writing checkpoints
In practice, the user should execute again the entire
MapReduce task
Spring 2015 Cornell Tech
25
How Many Hosts to Allocate?
The number of map processes, depends on the size of the
input and the size of a block
The number of reducers should be so that M*R will not
be too big for the memory of the master and not to
produce too many chunks of output
Sampling Using MapReduce
Spring 2015 Cornell Tech
27
Simple Random Sample
Spring 2015 Cornell Tech
28
Given a population P of size N, choose a sample of size k
A simple random sample of size k: any two subsets of
the population of size k have the same probability to be
selected
Different from Bernoulli Sampling or Poisson Sampling
for which the sample size is not a constant
Simple Random Sample
Common trick in DB: assign a random number to
each item and sort
Costly if N is very big (what is the cost?)
Is there a more efficient way to sample the data?
Spring 2015 Cornell Tech
29
Reservoir Sampling
“Reservoir sampling” described by [Knuth 69, 81]; enhancements [Vitter 85]
Fixed size k uniform sample from arbitrary size N stream in one pass
No need to know stream size in advance
Include first k items w.p. 1
Include item n > k with probability p(n) = k/n, n > k
Pick j uniformly from {1,2,…,n}
If j ≤ k, swap item n into location j in reservoir, discard replaced item
Simple proof shows the uniformity of the sampling method:
Let Sn = sample set after n arrivals
k=7 n
m (< n)
Previously sampled item: induction
New item: selection probability Prob[n Sn ] = pn := k/n
Distributed Dataset
Spring 2015 Cornell Tech
31
The population is distributed over several servers
A constraint s
kdefines which objects of the population
are relevant
There may be different constraints that refer to different
samples
The goal: for each constraint, find a simple random sample of
the objects that satisfy the constraint
Naïve Approach
Cornell Tech 32
map (null, t) [(s
k, t)] (if t satisfies s
k)
reduce (s
k, [t
1, …, t
N]) (SRS ([t
1, …, t
N], f
k))
- Apply map based on the constraints
- Apply a simple random sample in the reduce phase
Spring 2015
Split 1
Split 2
Split 3
x1
Q= x1
x2 Map phase
Combiner phase
MapReduce Evaluator
Cornell Tech 33
Using Algorithm R
Spring 2015
x1 Q= x1 x2
MapReduce
Reduce phase
Using Algorithm R
1 2 3 4
sample size = 2
2 3
A-priori probability:
Proper Sampling in the Reduce Phase
5 6
5 6
Samples may have been drawn from
subsets of different sizes
Spring 2015 35
MR-SQE
Cornell Tech 36
map (null, t) [(s
k, t)] (if t satisfies s
k)
combine (s
k, [t
1, …, t
N]) (SRS ([t
1, …, t
N], f
k), N)
reduce (s
k, [(Š
1, Ň
1),…, (Š
K, Ň
K)])
[unified-sampler ({(Š
1, Ň
1),…, (Š
K, Ň
K)}, f
k)]
- Apply map based on the stratum constraints
- Apply a combiner to sample the sets during the map process, to decrease the amount of transmitted data - In the reduce, unify the samples, while taking into
account the initial sizes of the sets and the required strata sizes
Spring 2015
HADOOP (MapReduce)
Spring 2015 Cornell Tech
37
MapReduce vs Hadoop
Unless you work at Google, or use Google App Engine, you
will use Hadoop MapReduce
Started by Yahoo
Developed by Apache
Google’s implementation (at least the one described) is
written in C++
Hadoop is written in Java
Major Components
User Components:
Mapper
Reducer
Combiner (Optional)
Partitioner (Optional) (Shuffle)
Writable(s) (Optional)
System Components:
Master
Input Splitter*
Output Committer*
* You can use your own if you really want!
Image source: http://www.ibm.com/developerworks/java/library/l-hadoop-3/index.html Spring 2015 Cornell Tech
39
Notes
Mappers and Reducers are typically single
threaded and deterministic
Determinism allows for restarting of failed jobs, or speculative execution
Need to handle more data? Just add more
Mappers/Reducers!
No need to handle multithreaded code
Since they’re all independent of each other, you can run (almost) arbitrary number of nodes
Spring 2015 Cornell Tech
40
Notes
Mappers/Reducers run on arbitrary
machines
A machine typically has multiple map and
reduce slots available to it, typically one per
processor core
Mappers/Reducers run entirely independent
of each other: In Hadoop, they run in
separate JVMs
Spring 2015 Cornell Tech
41
Input Splitter
Is responsible for splitting your input into multiple
chunks
These chunks are then used as input for your mappers
Splits on logical boundaries. The default is 64MB per
chunk
Typically, the user can just use one of the built in
splitters, unless required to read from a specially
formatted file
Code Example
Next, we present a code example of the word-count
application
This is not optimized. See in the following tutorial how
to write it is an optimized way:
http://hadoop.apache.org/docs/r1.2.1/mapred_tutori
al.html
Spring 2015 Cornell Tech
43
1. package org.myorg;
2.
3. import java.io.IOException;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10.
11.
import org.apache.hadoop.util.*;
12. public class WordCount {
13. Cornell Tech Spring 2015
44
implements Mapper<LongWritable, Text, Text, IntWritable> { 15. private final static IntWritable one = new IntWritable(1);
16. private Text word = new Text();
17.
18. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 19. String line = value.toString();
20. StringTokenizer tokenizer = new StringTokenizer(line);
21. while (tokenizer.hasMoreTokens()) { 22. word.set(tokenizer.nextToken());
23. output.collect(word, one);
24. } 25. } 26. }
Define the map function
Spring 2015 Cornell Tech
45
Reporter for reporting progress
28. public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
29.
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30. int sum = 0;
31. while (values.hasNext()) { 32. sum += values.next().get();
33. }
34. output.collect(key, new IntWritable(sum));
35. } 36. }
Define the reduce function
38. public static void main(String[] args) throws Exception { 39. JobConf conf = new JobConf(WordCount.class);
40. conf.setJobName("wordcount");
41.
42. conf.setOutputKeyClass(Text.class);
43. conf.setOutputValueClass(IntWritable.class);
44.
45. conf.setMapperClass(Map.class);
46. conf.setCombinerClass(Reduce.class);
47. conf.setReducerClass(Reduce.class);
48.
Set the mapper Set a combiner Set the reducer
Spring 2015 Cornell Tech
47
JobConf.setOutputKeyComparatorClass(Class) to set the partition of map keys to reducers
38. public static void main(String[] args) throws Exception { 39. JobConf conf = new JobConf(WordCount.class);
40. conf.setJobName("wordcount");
41.
42. conf.setOutputKeyClass(Text.class);
43. conf.setOutputValueClass(IntWritable.class);
44.
45. conf.setMapperClass(Map.class);
46. conf.setCombinerClass(Reduce.class);
47. conf.setReducerClass(Reduce.class);
48.
Set the mapper Set a combiner Set the reducer
Spring 2015 Cornell Tech
48
JobConf.setNumMapTasks(int) hint how to set the number of map tasks
JobConf.setNumReduceTasks(int) tells how to set the number of reduce tasks
Combiner
Essentially an intermediate reducer
Is optional
Reduces output from each mapper, reducing bandwidth
and sorting
Cannot change the type of its input
Input types must be the same as output types
Spring 2015 Cornell Tech
49
49. conf.setInputFormat(TextInputFormat.class);
50. conf.setOutputFormat(TextOutputFormat.class);
51.
52. FileInputFormat.setInputPaths(conf, new Path(args[0]));
53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55. JobClient.runJob(conf);
57. } 58. } 59.
InputFormat generates InputSplit
Output Committer
Is responsible for taking the reduce output, and
committing it to a file
Typically, this committer needs a corresponding input
splitter (so that another job can read the input)
Again, usually built in splitters are good enough, unless
you need to output a special kind of file
Spring 2015 Cornell Tech
51
Partitioner (Shuffler)
Decides which pairs are sent to which reducer
Default is simply:
Key.hashCode() % numOfReducers
User can override to:
Provide (more) uniform distribution of load between reducers
Some values might need to be sent to the same reducer
Ex. To compute the relative frequency of a pair of words <W1, W2> you would need to make sure all of word W1 are sent to the same reducer
Binning of results
Spring 2015 Cornell Tech
52
Master
Responsible for scheduling & managing jobs
Scheduled computation should be close to the data if possible
Bandwidth is expensive! (and slow)
This relies on a Distributed File System (GFS / HDFS)!
If a task fails to report progress (such as reading input, writing output, etc), crashes, the machine goes down, etc, it is assumed to be stuck, and is killed, and the step is re-launched (with the same input)
The Master is handled by the framework, no user code is necessary
Spring 2015 Cornell Tech
53
Master Cont.
HDFS can replicate data to be local if necessary for scheduling
Because our nodes are (or at least should be) deterministic
The Master can restart failed nodes (Nodes should have no side effects!)
If a node is the last step, and is completing slowly, the master can launch a second copy of that node
This can be due to hardware issues, network issues, etc.
First one to complete wins, then any other runs are killed
Backup Tasks
A slow running task (straggler) prolong overall execution
Stragglers often caused by circumstances local to the worker
on which the straggler task is running
Overload on worker machined due to scheduler
Frequent recoverable disk errors
Solution
Abort stragglers when map/reduce computation is near end (progress monitored by Master)
For each aborted straggler, schedule backup (replacement) task on another worker
Can significantly improve overall completion time
Spring 2015 Cornell Tech
55
Backup Tasks
(1) without backup tasks (2) with backup tasks (normal)
Spring 2015 Cornell Tech
56
Writables
Are types that can be serialized (deserialized) to a stream
Are required to be input/output classes, as the framework will serialize the data before writing them to disk
User can implement this interface, and use their own types for their input/output/intermediate values
There are default classes for basic values, like Strings, Integers, Longs, etc.
Can also handle data structures, such as arrays, maps, etc.
Your application needs at least six writables
2 for your input
2 for your intermediate values (Map <-> Reduce)
2 for your output
Spring 2015 Cornell Tech
57