MapReduce Framework for Distributed Computation
Summer School on Massive Data Management
Daniel McDermott
Eastern Washington University Cheney, WA, U.S.A
July 4, 2013
About Me
Born in Los Angeles, CA, USA Currently live in Spokane, WA, USA
Graduate Student at Eastern Washington University Systems Administrator for the CS department
You can contact me for help with any of the materials at:
I am always on the Freenode IRC network as username onefish irc://irc.freenode.net/#discoproject
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Defined
Data-Intensive Text Processing with MapReduce, 2010
“MapReduce is a programming model for expressing distributed computations on massive datasets and an execution framework for large-scale data processing on clusters of commodity servers.”
First described in “MapReduce: Simplified Data Processing on Large Clusters” by Jeffery Dean and Sanjay Ghemawat, Google Inc, 2004.
Two Aspects
A Programming Model / Paradigm from functional programming algorithm design restriction!
A Execution Framework / Runtime
Automatic parallel/distributed execution on large cluster computers.
Scales horizontally, not vertically.
Assumes failures are common.
Moves computation to the data.
Hide system-level details from the programmer.
Scales seamlessly.
The restirctions of the Programming Model will enable the features of the Exection Framework
Overall Theme of MapReduce
Given a large dataset, apply some transformation to each element or record of the dataset (map), producing a temporary intermediate dataset.
Then iterate over the intermediate dataset performing an aggregation, summarization, or similar reduction (reduce).
A surprising number of problems in Datamining and Computer Science can be phrased in this way.
The “Big Data” Revolution
The Unreasonable Effectiveness of Data, 2009
Problems that involve interacting with humans, such as natural language understanding, have not proven to be solvable by concise, neat formulas like F = ma. Instead the best approach appears to be harnessing the power of data...
Sloan Digital Sky Survey produces 500TB of astronomical images each month 1
LHC @ CERN: Estimates 15PB of data generated each year. 2 In the future, personal DNA sequencing will become routine
The amount of data in existence doubles roughly every two years.3
1www.sciencenewsdaily.org/internet-news/202854792/
2home.web.cern.ch/about/computing
3Cisco, The Economist
Scope of problems
A word of warning
“To a man with a hammer, every problem looks like a nail.”
MapReduce is a data parallel paradigm focused on scalability and data bandwidth
Most solutions involve sequentially reading large amounts of static data from disks and moving it through a wide computation pipeline MapReduce is a batch processing system
Moving data will dominate the cost of computation MapReduce is not:
Low latency
A data retrieval method A “supercomputing” method
Data is large, computation is relatively small
Some MapReduce Solutions
Text processing: wordcount, co-occurrence matrix, relative frequencies, distributed grep
Inverted Index construction
Graph Algorithms (SSSP, PageRank) Relational Algebra
Clustering Algorithms Matrix Multiplications
Summarization / Histogram Construction Distirbuted Sort
MapReduce Implementations
Strictly MapReduce:
Hadoop (Apache Foundation, Java)
Disco (Nokia Research Center, Python)
Google’s (in-house, C++)
Systems which use MapReduce:
MongoDB (distributed document store)
Cassandra (distributed database system)
CouchDB (distributed document store)
Lucene (free search engine software)
Who Uses MapReduce?
Deployed widely at Google, Amazon, Facebook, Yahoo, Ebay, IBM, Nokia, Qualcomm, LinkedIn, CERN, and others.
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Parallel Programming Review
Parallel: Multiple threads / processors acting on same problem Single greatest challenge to parallel algorithm design: Shared State
i n t x = 4 ; #g l o b a l / s h a r e d v a r i a b l e s i n t y = 0 ;
f u n c f o o ( ) { x++;
x = y ; }
f u n c b a r ( ) { y++;
x += 2 ; }
new t h r e a d ( f o o ) . r u n ( ) new t h r e a d ( b a r ) . r u n ( )
What are x and y???
Parallel = Unpredictable
Applies to more than just integers Producer / consumer problems
Reporting status to a master process
Notifying other threads of state changes
All require some synchronization
Synchronization Primitives
Special shared variables which support atomic operations Sempahores:
Imagine a set of train tracks that must cross a bridge Semaphore is the flag on each side of the bridge Mutex: unlock() and lock()
Condition Variables
wait() and notify(), wait blocks thread until it receives a notify
Solution?
Protect critical section...
s e m a p h o r e sem ; f u n c f o o ( ) {
sem . l o c k ( ) x++;
x = y ;
sem . u n l o c k ( ) }
f u n c b a r ( ) { sem . l o c k ( ) y++;
x += 2 ; sem . u n l o c k ( ) }
Wait a second...
Fixed Solution
Force foo to run before bar.
s e m a p h o r e sem ; b o o l done = f a l s e ; c o n d i t i o n V a r c v ; f u n c f o o ( ) {
sem . l o c k ( ) x++;
x = y ;
done = t r u e ; sem . u n l o c k ( ) c v . n o t i f y ( ) }
f u n c b a r ( ) { sem . l o c k ( ) i f ( ! done )
c v . w a i t ( sem ) y++;
x += 2 ; sem . u n l o c k ( ) }
We need to synchronize every time we access or update shared state What if the threads are distributed?
Ahmdahl’s Law
1 B +
1−Bn“The theoretical speedup of any parallel algorithm is bounded by its strictly sequential portion.”
Distributed Computing Bottlenecks
In the case of distributed computing, this sequential portion will almost always be dominated by the cost of communication across an
interconnect
Even with libraries, distributed systems programming is burdensome Even fastest interconnects incur incredible delay compared to the speed of modern CPUs
Focus of distributed algorithm design is on reducing this communication
Parallel vs Distributed
Parallel: Special purpose interconnects focused on low latency, shared memory model, computationally intense problems such as physics simulations and dynamic systems modeling. Frequent random access syncronization in small bytes
Distributed: Inexpensive hardware, 1Gbit ethernet, data intensive problems with minimal shared state between compute elements, large streaming reads and writes
Scaling Models
vertical: “scale up”: buy better, faster, special purpose hardware → HPC
horizontal: “scale out”: buy more hardware of same type → MapReduce
Traditional HPC Datacenter
Must scale vertically
Parallel system focused on low latency
The MapReduce Datacenter
Scales horizontally with cheap, consumer-grade, hardware Distributed system focused on wide throughput
Distributed File Systems
MapReduce file systems oriented towards large, sequential, reads and writes. Streaming data to and from the disk.
Fault tolerant via block replication Logical
Generally not POSIX compatible Examples:
Google’s BigTable (Bigtable: A Distributed Storage System for Structured Data, Dean, Burrows, et.al. 2006)
HDFS - Hadoop Distributed File System DDFS - Disco Distributed File System
Distributed File System Architecture
Large Block Size (typically 64MB)
With K replication factor, K − 1 nodes can fail
Name node serves metadata about all files on file system
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Functional Programming Review
Applications of functions do not modify data, always create new data Original data structures always exist unmodified
Data flow is implicit in program design
=⇒
Order of application across threads or functions does not matter There are no side-effects and thus no shared-state between function applications or threads
Functional Programming Review
“Pure” functions
f u n c f o o ( m y l i s t : i n t l i s t ) =
sum ( m y l i s t ) + p r o d ( m y l i s t ) + l e n ( m y l i s t ) Order of application of sum(),prod(), etc does not matter – they only produce new data
Each purely expresses a mathematical computation Thus each function can be put in it’s own thread
No Side Effects
Functional updates do not modify structures f u n c append ( x , l s t ) =
l e t l s t = r e v e r s e l s t i n r e v e r s e ( x : : l s t )
This reverses a list (which creates a new list), prepends an element, then reverses it again
But it never modifies lst
Higher Order Functions
Functions can take other functions as their arguments f u n c d o D o u b l e ( f u n , x ) =
f u n ( f u n ( x ) ) f u n c myFun ( x ) =
x ∗ 3
d o D o u b l e ( myFun , 2 )
returns (2 ∗ 3) ∗ 3 = 18 What about?:
f u n c myFun ( x ) = x + x
d o D o u b l e ( myFun , 5 )
Common Functional Programming Patterns
Functional programming often operates over lists...
Just as in procedural programming, functional programming has common patterns which are built into the languages
Map: Element-wise transform a list Fold: Accumulate a list into a single value
Map
Map Function map(func, list)
Input: list: A list to map over. func: A function to apply to each element in the list, which returns a new element.
Output: A new list, representing func applied to every element of list newlist ← new empty list
foreach element in list do
newlist.append(func(element)) return newlist
Fold or Accumulate
(Reduce)Fold Function fold(acc, func, list)
Input: acc: An accumulator value. func:
A function which takes a list element and a value and returns a new value.
list: A list to read.
Output: The final value of acc.
foreach element in list do acc = func(acc, element) return acc
Can acc be a list?
Fold Exercise
Can we implement:
f u n c f o o ( m y l i s t : i n t l i s t ) =
sum ( m y l i s t ) + p r o d ( m y l i s t ) + l e n ( m y l i s t )
Using only folds?
Exercise Solved
f u n c f o o ( m y l i s t : i n t l i s t ) =
sum ( m y l i s t ) + p r o d ( m y l i s t ) + l e n ( m y l i s t ) f u n c sum ( l s t ) =
f o l d (lambda( x , a)=>x+a ) , 0 , l s t f u n c p r o d ( l s t ) =
f o l d (lambda( x , a)=>x ∗ a ) , 1 , l s t f u n c l e n ( l s t ) =
f o l d (lambda( x , a)=>1+a ) , 0 , l s t
Map and Fold Together
fold(acc, f , map(g , list))
Length of longest word
Given a list of words, output the length of the longest string in the set Given input “one fish two fish red fish blue fish”, should output: “4”
Wordcount
Given a string (or doccument), output the number of times each word appears in the string.
Given input “one fish two fish red fish blue fish”, should output:
{”one” : 1, ”two” : 1, ”red ” : 1, ”blue” : 1, ”fish” : 4}
.
Exercises Solved
f u n c l e n L o n g e s t ( l s t ) =
f o l d ( 0 , max , map ( l e n , l s t ) )
f u n c wordCount ( l s t ) =
f o l d ( d i c t { } , combine , map ( c o u n t , l s t ) ) c o u n t ( s t r ) =
( s t r , 1 )
c o m b i n e ( d i c t { } , e l e m ) = d i c t { e l e m [ 0 ] } += e l e m [ 1 ] There is a simpler way to do wordCount...
Parallelizing Map
In a purely functional setting, there are no side effect from elements of a list being computed by map
The order of application of “func” to the list elements is commutative, thus we can parallelize
This is the core property of MapReduce as a programming model
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Redefining Map
Instead of a list, the input to map will be records from a data source, supplied as key , value pairs
(ex. filename,contents)
Mapping over the entire data source is accomplished by mapping over each separate record in parallel (a “two level” map)
Each map process will output one or more intermediate key , value pairs
User code is responsible for iteration and application User Map Function
method Map(docid a, doc d) forall the term t ∈ doc d do
emit(term t, count 1)
Redefining Reduce
After map phase, all intermediate values for a given key are aggregated into a list
Reduce summarizes or combines the intermediate values per key into one or more final values
In practice, there is usually one final value per key
Again, the user code is responsible for iteration and application User Reduce Function
method Reduce(term t, counts [c1, c2, ...]) sum ← 0
forall the count c ∈ counts [c1, c2, ...] do sum ← sum + c
emit(term t, count sum)
Paradigm and Runtime Differences
How MapReduce different from FP:
The runtime map and reduce are not! the functional programming map and fold
The user code is responsible for the iteration over datasets and application of computation
The programming model serves a guideline to implement the functions, a blueprint
Aside: Sometimes, we can cheat / break the programming model
Programming Model to Runtime
The programmer defines a map and a reduce function with the types:
map(K 1, V 1) → (K 2, [v 2]) reduce(K 2, [v 2]) → (K 3, v 3) The framework:
1 Splits the input into chunks or blocks
2 Assigns each block to a map task, assigns all tasks to worker machines (mappers)
3 Each worker applies the map function to each element of its assigned block outputing keys and values
4 Aggregates these values by key (shuffle and sort)
5 Assigns (partitions) each key to a reduce task, assign all tasks to worker machines (reducers)
6 Each worker applies the reduce function over the values for its keys
Programming Model to Runtime
Wordcount example
WordCount class Job
method Map(docid a, doc d) forall the term t ∈ doc d do
emit(term t, count 1)
method Reduce(term t, counts [c1, c2, ...]) sum ← 0
forall the count c ∈ counts [c1, c2, ...] do sum ← sum + c
emit(term t, count sum)
Wordcount Data Flow
Refinements
Combiner Function
Like a mini-reducer running on the mapper side. Goal is to aggregate values to minimize data transfered across the network.
Runs on the mappers before data is shuffled across the network Viewed by the framework as optional
Partition Function
By default: With r partitions, send the (key , val ) pair to the “hash(key ) mod r ”th reduce task.
Redefinable → may want certain keys to always appear together. For instance all URL’s for a particular site.
Improved Wordcount
WordCount class Job
method Map(docid a, doc d) forall the term t ∈ doc d do
emit(term t, count 1)
method Combine(term t, counts [c1, c2, ...]) partsum ← 0
forall the count c ∈ counts [c1, c2, ...] do partsum ← partsum + c
emit(term t, count partsum)
method Reduce(term t, partsums [c1, c2, ...]) sum ← 0
forall the partsums c ∈ partsums [c1, c2, ...] do sum ← sum + c
emit(term t, count sum)
More Complete Wordcount Example
Systems Terminology
Job: An API call to a MapReduce cluster, includes a Map and Reduce function and associated parameters. Each job will be divided into tasks by the runtime.
Task: A unit of work, as a subprocess to be run on the cluster, divided up by:
Map Task: each file or block on the distributed FS Reduce Task: each partition of the intermediate key space Worker: A slot of computation, often a processor in the cluster, where tasks can be assigned.
Mapper: A worker assigned a Map Task Reducer: A worker assigned a Reduce Task
MapReduce Systems Perspective
Locality Aware Scheduling
Data is replicated across the cluster on the same machines that perform computation.
Increased replication factor gives more flexibility to scheduler.
Key Feature: Move computation to data, do not fetch data to computation. Programs are small, data is large.
Fault Tolerance
Inexpensive, commodity, hardware
Re-Execution: machine or rack failure → run task again. (note we must re-run all completed tasks on that node as well, why?)
Bad Record Skipping: user code crashes on certain input → log error and skip
Speculative Execution: stragglers → use idle machines to replicate in-progress jobs
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Inverted Index
Given a set of documents which contain terms, output a set of terms which “contain” document IDs.
The document IDs should be sorted within their respective terms, for faster indexing
Each document ID can have associated with it some payload (i.e. the term frequency per document)
Inverted Index Algorithm
InvertedIndex class Job
method Map(docid a, doc d) H ← new AssociativeArray forall the term t ∈ doc d do
H{t} ← H{t} + 1 forall the term t ∈ H do
emit(term t, posting (n, H{t}))
method Reduce(term t, postings[(n1, f1), (n2,f2),...]
P ← new list
forall the posting (a, f) ∈ postings[(n1, f1), (n2, f2), ...] do P.add((a,f))
P.sort()
emit(term t, postings P)
Inverted Index Execution
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Relational Joins Review
All common relational algebra operations can be computed by
MapReduce. See Ulman, 2013 for algorithms for selection, projection, set operations, etc.
Defn Natural Join
Given two relationships, R and S , which share a common descriptor in their schema, output corresponding tuples from both relationships whose values agree on the common descriptor.
Example: Given relationships R(a, b) and S (b, c), join S , R on b a b
4 6 5 5 2 9 9 1
b c 3 4 5 6 9 7 1 8 Would output (5,5,6) (2,9,7) (9,1,8)
Natural Join Algorithms
Relational Algebra in MapReduce is critical to many higher level systems which use MapReduce as their foundation
Three types of MapReduce Join Algorithms Reduce Side Join
Map Side Join
Memory-backed Join
Reduce-side Join
one-to-one NaturalJoin class Job
method Map(relation id, tuple (x,y)) if id == R then
emit(y, tuple (x, y)) if id == S then
emit(x, tuple (x, y))
method Reduce(joinkey b, tuples [t1,...]) if size(tuples) == 2 then
emit(b, merge(tuples[t1, t2]))
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Cluster Setup
Handout
Six Machines, each:
4x 2.8Ghz Intel Xeon 5GB Memory 4x 73GB 10,000rpm SCSI U320 in RAID 5 (175GB after OS + formatting)
30GB Memory Total 24 Processors Total 1TB Distributed Storage 1Gbit switched network
Cluster Setup
Disco Framework
MapReduce runtime in Erlang + Python
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Main Theme of MapReduce Algorithms
Primary focuses of efficient MapReduce algorithm design:
phrasing the solution within the restrictions of the programming model
reducing the amount of intermediate data that may need to be fetched across the network
dealing with scalability concerns and set size vs I/O tradeoffs
What the programmer cannot control
Where a map task or reduce task runs (i.e. on which node) When a map task or reduce task finishes
Which input key-value pairs are processed by a specific map task Which intermediate key-value pairs are processed by a specific reducer
What the programmer can control
Construct complex data structures as keys and values to store and communicate partial results
Run initialization code at the beginning or end of each Map and Reduce Task
Preserve state in map and reduce functions across multiple input or intermediate keys
Ability to control the sort order of intermediate keys, thus the order the reducer will encounter the keys
The ability to control the partition of the key space, thus the set of keys that will appear at a particular reduce task
The Communication Cost Model
Dean, 2011
The communication cost of an algorithm is the sum of
communication cost of all the tasks implementing the algorithm
The cost of communication will vastly dominate the cost of CPU operations.
The algorithm being executed by each task is typically very simple, often linear of it’s input
Due to horizontal scalability, computation is cheap compared to communication
When measuring the communication cost, we only count the outputs of each task
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
WordCount Revisited
WordCount class Job
method Map(docid a, doc d) forall the term t ∈ doc d do
emit(term t, count 1)
method Reduce(term t, counts [c1, c2, ...]) sum ← 0
forall the count c ∈ counts [c1, c2, ...] do sum ← sum + c
emit(term t, count sum)
The communication cost of this implementation is O(n), with n terms in the entire collection
In-mapper combining
Local Aggregation
Leveraging the associativity and commutativity of the Reduce operation to combine values before Reducers fetch them across the network.
In MapReduce Frameworks, combiners are viewed by the runtime as optional optimizations. The correctness of the algorithm cannot depend on the combiner.
Force In-mapper combining
WordCount Improved
Tally counts for entire document
(Reduce is the same) WordCount
method Map(docid a, doc d) H ← new AssociativeArray forall the term t ∈ doc d do
H{t} ← H{t} + 1 forall the term t ∈ H do
emit(term t, count H{t} )
The communication cost is O(d σ), with d documents and a language of size σ
Wordcount Best
Tally counts across documents by preserving state across calls to map + in-mapper combining
WordCount
method Initialize
H ← new AssociativeArray method Map(docid a, doc d)
forall the term t ∈ doc d do H{t} ← H{t} + 1 method Finalize
forall the term t ∈ H do emit(term t, count H{t} )
The communication cost is O(kσ), with k workers and a language of size σ
Distributed Mean
Problem
Given a large dataset where input keys are strings and values are integers, we wish to compute the mean of all integers associated with the same key, rounded to the nearest integer
For example, a user log representing time spent viewing particular web pages on your site:
PageA 23 PageB 38 PageA 9 PageB 40 PageB 89 PageA 26 PageB 33
This user spends 19 seconds on PageA and 50 seconds on PageB, on average
Naive Implementation
DistributedMeans
method Map(string t, int r) emit(string t, int r )
method Reduce(string t, ints [r1, r2,...]) sum ← 0
cnt ← 0
forall the int r ∈ ints [r1, r2, ...] do sum ← sum + r
cnt ← sum + 1 ravg ← sum/cnt
emit(string t, int ravg )
Because Map is an identity, we will need to shuffle the entire dataset across the network in the worst case, the communication cost will be O(n)
Correctness of Local Aggregation
Simple Local Aggregation is only correct in the case where the function Reduce is going to operate over is both associative and commutative.
Notice that:
Mean(1,2,3,4,5) 6= Mean(Mean(1,2),Mean(3,4,5))
First Attempt
DistributedMeans
method Map(string t, int r) emit(string t, pair(r,1) )
method Combine(string t, [(s1,c1), (s2,c2),...]) sum ← 0
cnt ← 0
forall the int r ∈ ints [r1, r2, ...] do sum ← sum + r
cnt ← sum + 1
emit(string t, pair (sum,cnt))
method Reduce(string t, pairs [(s1,c1), (s2,c2),...]) sum ← 0
cnt ← 0
forall the pair (s,c) ∈ pairs [(s1,c1), (s2,c2),...] do sum ← sum + r
cnt ← sum + 1
emit(string t, pair (sum,cnt))
Correct Solution
DistributedMeans
method Map(string t, int r) emit(string t, int r )
method Combine(string t, ints [r1, r2,...]) sum ← 0
cnt ← 0
forall the int r ∈ ints [r1, r2, ...] do sum ← sum + r
cnt ← sum + 1
emit(string t, pair (sum,cnt))
method Reduce(string t, pairs [(s1,c1), (s2,c2),...]) sum ← 0
cnt ← 0
forall the pair (s,c) ∈ pairs [(s1,c1), (s2,c2),...] do sum ← sum + r
cnt ← sum + 1
emit(string t, pair (sum,cnt))
Effectiveness of Local Aggregation
Highly dependant on the size of the intermediate key space, the number of workers, reduce tasks, and the distribution of intermediate keys via the partition function.
Outline
1 What is MapReduce?
2 Distributed Systems Background 3 Functional Programming Roots 4 MapReduce Execution Framework 5 Inverted Index
6 Relational Algebra 7 Lab / Demonstration
8 MapReduce Algorithm Design 9 Local Aggregation
10 Pairs and Stripes
Co-occurrence Matrix Construction
Definition
A n × n matrix where n is the number of unique words in the corpus. A cell mij contains the number of times word wi occurs with wj. Occurrence is defined by some context, such as same sentence, paragraph, or a sliding window of k words.
For example, with k = 1 and the string ”one fish two fish”
one fish two
one 0 1 0
fish 1 0 2
two 0 2 0
For large vocabularies, the size of this problem quickly grows out of control.
Pairs Solution
CooccurPairs
method Map(docid a, doc d) forall the term w ∈ doc d do
forall the term u ∈ Neighbors(w ) do emit((w , u), count 1)
method Reduce(pair p, counts[c1, c2,...]) s ← 0
forall the count c ∈ counts[c1, c2,...] do s ← s + c
emit(pair p, count s ))
Emit a count for each co-occurrence
Analysis of Pairs
For a corpus of size n, at worst we will produce O(n2) communications.
However, requires limited space complexity. If the vocabulary is very large, and the context of ”neighbor” very wide, this solution will continue to scale.
Stripes Solution
CooccurPairs
method Map(docid a, doc d) forall the term w ∈ doc d do
H ← new AssociativeArray
forall the term u ∈ Neighbors(w ) do H{u} ← H{u} + 1
emit(term u), stripe H)
method Reduce(pair p, stripes [H1, H2,...]) Hf ← new AssociativeArray
forall the stripe H ∈ stripes[H1, H2,...] do Sum(HfH
emit(term w, stripe Hf )) Emit a stripe for each word
Analysis of Stripes
We will now produce O(kσ) potential communications in the worste case, where σ is our vocabulary size and k is the number of tasks reduce.
However there is a memory tradeoff. What if there is not enough memory to fit data into a stripe?
Further Resources
Runtimes / Frameworks:
Disco - MapReduce in Python + Erlang http://discoproject.org Hadoop - MapReduce in Javahttp://hadoop.apache.org
Free Online Books:
Data Intensive Text Processing with MapReduce, Jimmy Lin and Chris Dyer. http://lintool.github.io/MapReduceAlgorithms Mining of Massive Datasets, Anand Rajaraman and Jeffery Ulman.
http://infolab.standford.edu/~ullman/mmds.html The Datacenter as a Computer: Introduction to the Design of Warehouse-Scale Machines, Luiz Barroso and Urz Holze.
http://www.morganclaypool.com/doi/abs/10.2200/
S00193ED1V01Y200905CAC006
Further Resources
Papers:
Bigtable: A Distributed Storage System for Structured Data, Dean, Burrows, et.al. 2006)
MapReduce: Simplified Data Processing on Large Clusters, Dean &
Ghemawat 2004 Vidoes:
Google Developer Series on Cluster Computing and MapReduce http://youtu.be/yjPBkvYh-ss
Questions?