1
Introduction to Map/Reduce & Hadoop
V. CHRISTOPHIDES
Department of Computer Science University of Crete
2
Univ. of Crete Spring 2015
What is MapReduce?
MapReduce: programming model and associated implementation for batch processing of large data sets
MapReduce could be implemented on different architectures, but Google proposed it for clusters large clusters of commodity PCs
Functional programming meets distributed computing:
Automatic parallelization & distribution
A clean abstraction for programmers by factoring out many reliability concerns from application logic
Fault-tolerance
Status and monitoring tools
MapReduce implementations such as Hadoop differ in details, but main principles are the same
3
Overview
Clever abstraction that is a good fit for many real-world problems
Programmer focuses on algorithm itself
Runtime system takes care of all messy details
Partitioningof input data
Schedulingprogram execution
Handling machine failures
Managing inter-machine communication
Divide and Conquer
4
Univ. of Crete Spring 2015
MapReduce Implementations
Google has a proprietary implementation in C++
Bindings in Java, Python
Hadoop is an open-source implementation in Java
Development led by Yahoo, now an Apache project
Used in production at Yahoo, Facebook, Twitter, LinkedIn, Netflix but also A9.com, AOL, The New York Times, Last.fm, Baidu.com, Joost, Veoh, etc.
The de facto big data processing platform
Rapidly expanding software ecosystem
Lots of custom research implementations
For GPUs, cell processors, etc.
5
Hadoop: Storage & Compute on One Platform
Evolution from Apache Hadoop to the Enterprise Data Hub A. Awadallah Co-Founder & CTO of Cloudera SMDB 2014
6
Univ. of Crete Spring 2015
Expanding Data Requires A New Approach
Evolution from Apache Hadoop to the Enterprise Data Hub A. Awadallah Co-Founder & CTO of Cloudera SMDB 2014
7
Map Reduce Foundations
8
Univ. of Crete Spring 2015
Map and Reduce: The Origins
The programming idea of Map, and Reduceis 40+ year old
Present in all Functional Programming Languages e.g., APL, Lisp and ML
Alternate Map names: Apply-All
Higher Order Functions
take function definitions as arguments, or
return a function as output
Map and Reduce are higher-order functions
Map processes each record individually
Reduce processes (combines) set of all records in a batch
9
Map: A Higher Order Function
F(x: int) returns r: int
Let V be an array of integers W = map(F, V)
W[i] = F(V[i]) for all I i.e., apply F to every element of V
Examplesin Haskell
map (+1) [1,2,3,4,5]
== [2, 3, 4, 5, 6]
map (toLower) "abcDEFG12!@#“
== "abcdefg12!@#“
map (`mod` 3) [1..10]
== [1, 2, 0, 1, 2, 0, 1, 2, 0, 1]
10
Univ. of Crete Spring 2015
reduce: A Higher Order Function
reducealso known as fold, accumulate, compress or inject
Reduce/fold takes in a function and folds it in between the elements of a list
11
Fold-Left in Haskell
Definition
foldl f z [] = z
foldl f z (x:xs) = foldl f (f z x) xs
Examples
foldl (+) 0 [1..5] ==15
foldl (+) 10 [1..5] == 25
foldl (div) 7 [34,56,12,4,23] == 0
12
Univ. of Crete Spring 2015
Fold-Right in Haskell
Definition
foldr f z [] = z
foldr f z (x:xs) = f x (foldr f z xs)
Example
foldr (div) 7 [34,56,12,4,23] == 8
13
14
Univ. of Crete Spring 2015
Examples of
Map Reduce Computation
15
What can I do in MapReduce?
Three main functions:
Querying
Filtering (distributed grep, etc.)
Relational-based (join, selection, projection, etc.)
Summarizing
Computing Aggregates (word/record count, Min/Max/Average/Median/Standard deviation, etc.)
Data Organization (sort, indexing, etc.)
Analyzing
Iterative Message Passing (graph processing)
. . . large datasets in offline mode for boosting other on-line processes
16
Univ. of Crete Spring 2015
Grep Example
Search input files for a given pattern: e.g., Given a list of tweets (username, date, text) determine the tweets that contain a word
Map: emits a (filename, line) if pattern is matched
Reduce: Copies results to output Input
data
Split data Split data Split data Split data
grep grep grep grep
matches matches matches matches
cat All
matches
17
Word Count Example
Read text files and count how often words occur
The input is text files
The output is a text file
each line: word, tab, count
Map: Produce pairs of (word, count = 1) from files
Reduce: For each word, sum up the counts (i.e., fold) Input
data
Split data Split data Split data Split data
token token token token
count count count count
merge All
matches
18
Univ. of Crete Spring 2015
Inverted Index Example
(this was the original Google's Use Case)
Generate an inverted index of words from a given set of files
Map: parses a document and emits <word, docId> pairs
Reduce: takes all pairs for a given word, sorts the docId values, and emits a <word, list(docId)> pair
Input data
Split data Split data Split data Split data
parse parse parse parse
inverse inverse inverse inverse
cat Inverted list
19
Map Reduce
Input
data Result
M A P
R E D U C E Partitioning
Function
Map:
Accepts input key/value pair
Emits intermediate key/value pair
Reduce :
Accepts intermediate key/value* pair
Emits output key/value pair
20
Univ. of Crete Spring 2015
Real World Applications in MapReduce
http://www.dbms2.com/2008/08/26/known-applications-of-mapreduce/
Organizations MapReduce Applications
Google Wide-range applications, grep/ sorting, machine learning, clustering, report extraction, graph computation
Yahoo Data model training, Web map construction, Web log processing using Pig, and much, much more
Amazon Build product search indices
Facebook Web log processing via both MapReduce and Hive PowerSet(Microsoft) HBase for natural language search
Twitter Web log processing using Pig New York Times Large-scale image conversion
Others (>74) Details in http://wiki.apache.org/hadoop/PoweredBy (so far, the longest list of applications for MapReduce
21
MapReduce Principle
applied to BigData
22
Univ. of Crete Spring 2015
Adapt MapReduce for BigData
Always maps/reduces on list of key/value pairs
Records from the data source (lines out of files, rows of a database, etc) are fed into the map function as key*value pairs
map() produces one or more intermediate values along with an output key from the input
After the map phase is over, all the intermediate values for a given output key are combined together into a list
reduce() combines those intermediate values into one or more final values for that same output key
in practice, only one final value per key
23
Adapt MapReduce for BigData
Automatic Parallelization:
Depending on the size of RAW INPUT DATA instantiate multiple map() tasks
Depending upon the number of intermediate
<key, value>
partitions instantiate multiple reduce() tasks
Master program divvies up tasks based on data location
tries to have map() tasks on same machine as physical file data, or at least same rack
24
Univ. of Crete Spring 2015
Adapt MapReduce for BigData
Map/Reduce execute in parallelon a cluster
Fault tolerance is built in the framework
Specific systems/implementation aspects matters
How is data partitioned as input to map
How is data serialized between processes
Cloud specific improvements:
Handle elasticity
Take cluster topology (e.g., node proximity, node size) into account
Source: Google Developers
25
Execution on Clusters
Input files split (M splits)
Assign Master & Workers
Map tasks
Writing intermediate data to disk (R regions)
Intermediate data read & sort
Reduce tasks
Return
26
Univ. of Crete Spring 2015
Execution Overview
Master assigns map and reduce tasks to workers, taking data location into account
Mapper reads an assigned file split and writes intermediate key-value pairs to local disk
Mapper informs master about result locations, who in turn informs the reducers
Reducers pull data from appropriate mapper disk location
After map phase is completed, reducers sort their data by key
For each key, the Reduce function is executed and output is appended to final output file
When all reduce tasks are completed, master wakes up user program
27
MapReduce Data Flow in Hadoop
with no Reduce Tasks
28
Univ. of Crete Spring 2015
MapReduce Data Flow in Hadoop with
a Single Reduce Task
(K (K (K
(K1111, V, V, V, V1111)))) (K2(K(K(K222, V, V, V2, V222))))
(K2, List<V2>) (K2, List<V2>) (K2, List<V2>)
(K2, List<V2>) (K3, V3)(K3, V3)(K3, V3)(K3, V3)
29
MapReduce Data Flow in Hadoop
with Multiple Reduce Tasks
30
Univ. of Crete Spring 2015
MapReduce Programing Constructs in Hadoop
Programmers must specify:
map(k, v) → <k’, v’>*
reduce(k’, v’) → <k’, v’>*
All values with the same key are reduced together
Optionally, also:
partition(k’, number of partitions) → partition for k’
Often a simple hash of the key, e.g., hash(k’) mod n
Divides up key space for parallel reduce operations combine(k’, v’) → <k’, v’>*
Mini-reducers that run in memory after the map phase
Used as an optimization to reduce network traffic
1:many1:many1:many1:many
31
The Execution Framework Handles “Everything Else”
The execution framework handles “everything else”
Scheduling: assigns workers to map and reduce tasks
“Data distribution”: moves processes to data
Synchronization: gathers, sorts, and shuffles intermediate data
Errors and faults: detects worker failures and restarts
Limited control over data and execution flow
All algorithms must expressed in m, r, c, p
You don’t know:
Where mappers and reducers run
When a mapper or reducer begins or finishes
Which input a particular mapper is processing
Which intermediate key a particular reducer is processing
32
Univ. of Crete Spring 2015
Execution
Map Phase
Shuffle Phase
Reduce Phase
33
Automatic Parallel Execution in MapReduce (Google)
Master
Task • Usually many more map tasks than machines
• E.g.
• 200K map tasks
• 5K reduce tasks
• 2K machines
34
Univ. of Crete Spring 2015
Map/Reduce Execution in Clusters
Several map or reduce tasks can run on a single computer
Each intermediate file is divided into R partitions, by partitioning function
Each reduce task corresponds to one partition
35
Moving Data From Mappers to Reducers
Mappers output need to be separated for different reducers
Reducers need to collect their data from all mappers and group it by key
Keys at each reducer are processed in order!
Shuffle& sortphase=synchronization barrier between map & reduce phase
Collects the output from all map executions
Transforms the map output Into the reduce input
Divides the map output Into chunks
Often one of the most expensive parts of a MapReduce execution
36
Univ. of Crete Spring 2015
Spilled to a new disk file when almost full
Spill files on disk:
partitioned by reduce task, each partition sorted by key
Spill files merged into single output file
Merge happens in memory if data fits, otherwise also on disk
Reduce task starts copying data from map task as soon as it completes. Reduce cannot start working on the data until all mappers have finished and their data has arrived.
Shuffle and Sort in Map Reduce
37
Probably the most complex aspect of MapReduce!
Map side
Map outputs are buffered in memory in a circular buffer
When buffer reaches threshold, contents are “spilled” to disk
Spills merged in a single, partitioned file (sorted within each partition):
combiner runs here
Reduce side
First, map outputs are copied over to reducer machine
“Sort” is a multi-pass merge of map outputs (happens in memory and on disk): combiner runs here
Final merge pass goes directly into reducer
Shuffle and Sort in Hadoop
38
Univ. of Crete Spring 2015
Shuffle and Sort in Hadoop
Mapper
Reducer
other mappers
other reducers circular
buffer (in memory)
spills (on disk)
merged spills (on disk)
intermediate files (on disk)
Combiner
Combiner
39
Tools for Synchronization
Cleverly-constructed data structures
Bring partial results together
Sort order of intermediate keys
Control order in which reducers process keys
Partitioner
Control which reducer processes which keys
Preserving state in mappers and reducers
Capture dependencies across multiple keys and values
40
Univ. of Crete Spring 2015
Master Data Structures
Master keeps track of status of each map and reduce task and who is working on it
Idle, in-progress, or completed
Master stores location and size of output of each completed map task
Pushes information incrementally to workers with in-progress reduce tasks
41
Semantics with Failures
If map and reduce are deterministic, then output is identical to non- faulting sequential execution
For non-deterministic operators, different reduce tasks might see output of different map executions
Relies on atomic commit of map and reduce outputs
In-progress task writes output to private temp file
Mapper: on completion, send names of all temp files to master (master ignores if task already complete)
Reducer: on completion, atomically rename temp file to final output file (needs to be supported by distributed file system)
42
Univ. of Crete Spring 2015
Fault Recovery
Workers are commodity (low cost) machines so failures are frequent:
given n workers, probability of at least one "problem" is 1 –(1-p)n
Workers are pinged by master periodically
Non-responsive workers are marked as failed
All tasks in-progress or completed by failed worker become eligible for rescheduling (reset to idle state)
Can be assigned to other mapper
Completed tasks are re-executed since result is stored on mapper’s local disk
Reducers are notified about mapper failure, so that they can read the data from the replacement mapper
43
Fault Recovery
Failed reducers identified through ping as well
Reducer’s in-progress tasks are reset to idle state
Can be assigned to other reducer
No need to restart completed reduce tasks, because result is written to distributed file system
Current implementations abort MapReduce computation on master failure
Users re-submit aborted jobs when new master process is up
Alternative: master writes periodic checkpoints of its data structures so that it can be re-started from checkpointed state
44
Univ. of Crete Spring 2015
Machines Share Roles
So far, logical view of cluster
In reality
Each cluster machine stores data
And runs MapReduce workers
Lots of storage + compute cycles nearby
45
The MapReduce “Runtime”
Handles scheduling
Assigns workers to map and reduce tasks
Handles “data distribution”
Moves processes to data
Handles synchronization
Gathers, sorts, and shuffles intermediate data
Handles errors and faults
Detects worker failures and restarts
Everything happens on top of a distributed filesystem
46
Univ. of Crete Spring 2015
Implementing in MapReduce
Externally for the user
Write a map function, and a reduce function
Submit a job; wait for result
No need to know anything about the environment
Google: 4000 servers + 48000 disks, many failures
Internally for MapReduce system designer
Run map in parallel
Shuffle: combine map results to produce reduce input
Run reduce in parallel
Deal with failures
47
Component Overview
48
Univ. of Crete Spring 2015
Peta-Bytes Data Processing
49
Google’s MapReduceinspired Yahoo’s Hadoop
Now as an open source (Java) Project of Apache hadoop.apache.org
We will use release 2.6.0 (November 2014)
Distribute processing of petabytes data across thousands of commodity machines
In 2008 about 100k MapReduce jobs per day in Google
over 20 petabytes of data processed per day
each job occupies about 400 servers
Easy to use since run-time complexity hidden from the users
50
Univ. of Crete Spring 2015
Hadoop Key Components
MapReduce -distributes applications
Hadoop Distributed File System (HDFS) -distributes data
Store big files across machines
Store each file as a sequence of blocks
Blocks of a file are replicated for fault tolerance
51
‘MapReduce’ vs. Hadoop
MapReduce Hadoop
Org Google Yahoo/Apache
Impl C++ Java
Distributed File Sys GFS HDFS
Data Base Bigtable HBase
Distributed lock Mgr Chubby ZooKeeper
History
Dec 2004 – Google GFS paper published
July 2005 – Nutch uses MapReduce
Jan 2006 – Doug Cutting joins Yahoo!
Feb 2006 – Becomes Lucene subproject
Apr 2007 – Yahoo! on 1000-node cluster
Jan 2008 – An Apache Top Level Project
Feb 2008 – Yahoo! production search index
52
Univ. of Crete Spring 2015
Google’s Problem in 2003: Already Lots of Data
Example: 20+ billion web pages x 20KB = 400+
terabytes
One computer can read 30-35 MB/sec from disk
~four months to read the web
~1,000 hard drives just to store the web
Even more to do something with the data:
process crawled documents
process web request logs
build inverted indices
construct graph representations of web documents
53
The Need: Data & Programs Distribution!
A Distributed System:
Scalable
Fault-Tolerant
Easy to Program
Applicable to Many Problems
Re-discovered by Google with goals:
"Reliabilityhas to come from the software"
"How can we make it easy to writedistributed programs?"
54
Univ. of Crete Spring 2015
How do we get Data to the Workers?
What’s the problem here?
Compute Nodes
SAN NAS
55
Distributed File System
Don’t move data to workersY Move workers to the data!
Store data on the local disks for nodes in the cluster
Start up the workers on the node that has the data local!
Why?
Not enough RAM to hold all the data in memory
Disk access is slow, disk throughput is good
A distributed file system is the answer
GFS (Google File System)
DFS for Hadoop (= GFS clone)
56
Univ. of Crete Spring 2015
Distributed File System Principles
Single Namespace for entire cluster
Data Coherency
Write-once-read-manyaccess model
Client can only append to existing files
Files are broken up into blocks
Typically 128 MB block size
Each block replicated on multiple DataNodes
Intelligent Client
Client can find location of blocks
Client accesses data directly from DataNode
57
Hadoop Supported File Systems
HDFS: Hadoop's own file system
Amazon S3 file system
Targeted at clusters hosted on the Amazon Elastic Compute Cloud server-on-demand infrastructure
Not rack-aware
CloudStore, previously Kosmos Distributed File System
like HDFS, this is rack-aware
optimization which takes into account the geographic clustering of servers
network traffic between servers in different geographic clusters is minimized
FTP Filesystem
stored on remote FTP servers
Read-only HTTP and HTTPSfile systems
58
Univ. of Crete Spring 2015
Hadoop Distributed File System (HDFS)
Very Large Distributed File System
10K nodes, 100 million files, 10 PB
Assumes Commodity Hardware
Files are replicated to handle hardware failure
Detect failures and recovers from them
Optimized for Batch Processing
Data locations exposed so that computations can move to where data resides
Provides very high aggregate bandwidth
User Space, runs on top of the file systems of the underlying OS
59
Hadoop Distributed File System (HDFS)
Mastermanages the file system and access to files by clients
Runs on master node of the HDFS cluster
Directs DataNodes to perform their low-level I/O tasks
Slavemanages the storages attached to the nodes running on
Runs on each slave machine in the HDFS cluster
Does the low-level I/O work
Files stored in many blocks
Each block has a block Id
Block Id associated with several nodes hostname:port (depending on level of replication)
60
Univ. of Crete Spring 2015
Secondary NameNode Client
HDFS Architecture
NameNode
DataNodes
Cluster Membership
Cluster Membership
NameNode: Maps a file to a file-id and list of MapNodes DataNode : Maps a block-id to a physical location on disk SecondaryNameNode: Periodic merge of Transaction log
61
NameNode Metadata
Meta-data in Memory
The entire metadata is in main memory
No demand paging of meta-data
Types of Metadata
List of files
List of Blocks for each file
List of DataNodes for each block
File attributes, e.g creation time, replication factor
A Transaction Log stored in multiple directories (on a local/remote FS)
Records file creations, file deletions. etc
62
Univ. of Crete Spring 2015
DataNode
A Block Server
Stores data in the local file system (e.g. ext3)
Stores meta-data of a block (e.g. CRC)
Serves data and meta-data to Clients
Block Report
Periodically sends a report of all existing blocks to the NameNode
Facilitates Pipelining of Data
Forwards data to other specified DataNodes
63
Block Placement
Current Strategy
One replica on local node
Second replica on a remote rack
Third replica on same remote rack
Additional replicas are randomly placed
Clients read from nearest replica
Would like to make this policy pluggable
64
Univ. of Crete Spring 2015
Data Correctness
Use Checksums to validate data
Use CRC32
File Creation
Client computes checksum per 512 byte
DataNode stores the checksum
File access
Client retrieves the data and checksum from DataNode
If validation fails, Client tries other replicas
65
Primary vs Secondary NameNode
NameNode daemon
“Head” interface to HDFS cluster
Records all global metadata
A single point of failure
Secondary NameNode daemon
One per cluster to monitor status of HDFS: Not a failover NameNode!
Records snapshots of HDFS metadata from “real” NameNode to facilitate recovery from failures
Can merge update logs in flight
Can upload snapshot back to primary
Need to develop a real High Availability (HA) solution
66
Univ. of Crete Spring 2015
Data Pipelining
Client retrieves a list of DataNodes on which to place replicas of a block
Client writes block to the first DataNode
The first DataNode forwards the data to the next DataNode in the Pipeline
When all replicas are written, the Client moves on to write the next block in file
67
Rebalancer
Goal: % disk full on DataNodes should be similar
Usually run when new DataNodes are added
Cluster is online when Rebalancer is active
Rebalancer is throttled to avoid network congestion
Command line tool
68
Univ. of Crete Spring 2015
Configuring HDFS
dfs.name.dir
Dir in the namenode to store metadata
dfs.data.dir
Dir in data nodes to store the data blocks
Must be in a local disk partition
fs.default.name
URI of the namenode
conf/masters
ip address of the master
conf/slaves
ip address of the slaves
Should have password-less SSH access to all the nodes
conf/hdfs-site.xml
<property>
<name>dfs.name.dir</name>
<value>/tmp/hadoop-test/name</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/tmp/hadoop-test/data</value>
</property>
conf/core-site.xml
<property>
<name>fs.default.name</name>
<value>hdfs://149.79.89.113:9000/</va lue>
</property>
69
User Interface
Command for HDFS User
hadoop dfs -mkdir /foodir
hadoop dfs -cat /foodir/myfile.txt
hadoop dfs -rm /foodir myfile.txt
Command for HDFS Administrator
hadoop dfsadmin -report
hadoop dfsadmin -decommission datanodename
Web Interface
http://host:port/dfshealth.jsp
http://hadoop.apache.org/common/docs/current/hdfs_shell.html
70
Univ. of Crete Spring 2015
WordCount
A Simple Hadoop Example
http://wiki.apache.org/hadoop/WordCount
71
Word Count over a Given Set of Web Pages
How can we do word count in parallel?
see bob throw see 1
bob 1
throw 1
see 1
spot 1
run 1
bob 1
run 1
see 2
spot 1 throw 1 see spot run
72
Univ. of Crete Spring 2015
Word Counting with MapReduce
Documents Financial, IMF, Economics, Crisis Financial, IMF, Crisis
Map
Key Value
Financial 1
IMF 1
Economics 1
Crisis 1
Financial 1
IMF 1
Crisis 1
Key Value
Economics 1
Harry 1
Financial 1
Harry 1
Potter 1
Film 1
Crisis 1
Harry 1
Potter 1
`
`
`
`
`
`
`
Documents Economics, Harry Financial, Harry, Potter, Film Crisis, Harry, Potter
Map
Doc1
Doc2
Doc3
Doc4
Doc5
M1
M2
© Kyuseok Shim (VLDB 2012 TUTORIAL)
73
Word Counting with MapReduce
Reduc
Key Value
Financial 3
IMF 2
Economics 2
Crisis 3
Harry 3
Film 1
Potter 2
Key Value
Financial 1 Financial 1 Financial 1
IMF 1
IMF 1
Economics 1 Economics 1
Key Value
Crisis 1
Crisis 1
Crisis 1
Harry 1
Harry 1
Harry 1
Film 1
Potter 1
Potter 1
Before reduce functions are called, for each distinct key, the list of its values is generated
Key Value list
Financial 1, 1, 1
IMF 1, 1
Economics 1, 1
Crisis 1, 1, 1
Harry 1, 1, 1
Film 1
Potter 1, 1
``
Documents Financial, IMF, Economics, Crisis Financial, IMF, Crisis
Map
Documents Economics, Harry Financial, Harry, Potter, Film Crisis, Harry, Potter
Map Reduce
Doc1
Doc2
Doc3
Doc4
Doc5
© Kyuseok Shim (VLDB 2012 TUTORIAL)
`
74
Univ. of Crete Spring 2015
Basic Hadoop API
Mapper
void map(K1 key, V1 value, OutputCollector<K2, V2>
output, Reporter reporter)
void configure(JobConf job)
void close() throws IOException
Reducer/Combiner
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3,V3> output, Reporter reporter)
void configure(JobConf job)
void close() throws IOException
Partitioner
void getPartition(K2 key,V2 value,int numPartitions)
*Note: forthcoming API changes2
75
WordCount Mapper
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void mapmapmapmap(LongWritable key, Text value,
Context context) throws IOException, InterruptedException { String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
} } }
76
Univ. of Crete Spring 2015
WordCount Reducer
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
public void reducereducereducereduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) { sum += val.get();
}
context.write(key, new IntWritable(sum));
} }
77
Job Configuration
Job object forms the job specification and gives control for running the job
Setmapper and reducer class to be used
Setoutput key and value classes for map and reduce functions
For reducer: setOutputKeyClass(), setOutputValueClass()
For mapper (omit if same as reducer): setMapOutputKeyClass(), setMapOutputValueClass()
Can set input types similarly (default is TextInputFormat)
Specify data input path using setInputPath()
Can be single file, directory (to use all files there), or file pattern
addInputPath() can be called multiple times to add multiple paths
Specify output path using setOutputPath()
Single output path, which is a directory for all output files
Method waitForCompletion() submits job and waits for it to finish
78
Univ. of Crete Spring 2015
Job Configuration
public int run(String[] args) throws Exception { Job job = new Job(getConf(), "WordCount");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass job.setOutputKeyClassjob.setOutputKeyClass
job.setOutputKeyClass(((Text.class(Text.classText.class); Text.class); ); );
job.setOutputValueClass job.setOutputValueClass job.setOutputValueClass
job.setOutputValueClass((((IntWritable.classIntWritable.classIntWritable.classIntWritable.class); ); ); );
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass job.setCombinerClass job.setCombinerClass
job.setCombinerClass((((Reduce.classReduce.classReduce.classReduce.class););););
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks job.setNumReduceTasks job.setNumReduceTasks job.setNumReduceTasks((((2222););););
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
79
Invocation of WordCount
hadoop hadoop hadoop
hadoop dfsdfsdfsdfs ---mkdir-mkdirmkdir <mkdir <hdfs<<hdfshdfshdfs----dirdirdir>dir>>>
hadoop hadoop hadoop
hadoop dfsdfsdfs -dfs --copyFromLocal-copyFromLocalcopyFromLocal <copyFromLocal <local<<locallocal-local---dirdir> dirdir> > > <<<<hdfshdfs-hdfshdfs--dir-dirdir> dir> > >
hadoop hadoop hadoop hadoop
jar hadoop jar hadoop jar hadoop
jar hadoop---*-**-*--examples.jar -examples.jar examples.jar examples.jar WordCount
WordCount WordCount WordCount [
[ [
[----m <#maps>] m <#maps>] m <#maps>] m <#maps>]
[ [ [
[----r <#reducers>] r <#reducers>] r <#reducers>] r <#reducers>]
<in
<in
<in
<in----dirdirdirdir> > > >
<out
<out
<out
<out----dirdirdirdir> > > >
80
Univ. of Crete Spring 2015
Hadoop Job Tuning
Choose appropriate number of mappers and reducers
Define combinerswhenever possible
Consider Map output compression
Optimize the expensive shuffle phase(between mappers and reducers) by setting its tuning parameters
Profiling distributed MapReduce jobs is challenging
81
Job Configuration Parameters
190+ parameters in Hadoop
Do their settings impact performance?
What are ways to set these parameters?
Defaults -- are they good enough?
Best practices -- the best setting can depend on data, job, and cluster properties
Automatic setting (Not yet in Hadoop)
82
Univ. of Crete Spring 2015
MapReduce Development Steps
Write Map and Reduce functions
Create unit tests
Write driver program to run a job
Can run from IDE with small data subset for testing
If test fails, use IDE for debugging
Update unit tests and Map/Reduce if necessary
Once program works on small test set, run it on full data set
If there are problems, update tests and code accordingly
Fine-tune code, do some profiling
83
Mechanics of Programming
Hadoop Jobs
84
Univ. of Crete Spring 2015
Anatomy of a Hadoop Job in MR v1.0
MapReduce program in Hadoop = Hadoop job
Jobs are divided into mapand reduce tasks
An instance of a running task is called a task attempt
Multiple jobs can be composed into a workflow
85
Launching a Hadoop Job
Source: http://www.jaso.co.kr/265
Client (i.e., driver program) creates a job, configures it, and submits it to job tracker
JobClient computes input splits (on client end)
Job data (jar, configuration XML) are sent to JobTracker
JobTracker puts job data in shared location, enqueues tasks
TaskTrackers poll JobTracker for tasks
TaskTrackers launch a task in a separate Java instance
86
Univ. of Crete Spring 2015
Job Launch: JobClient, JobTracker and TaskTracker
Client submits MapReduce job through
JobClient.runJob() // blocks
JobClient.submitJob() // does not block
waitForCompletion() submits job and polls JobTracker about progress every sec, outputs to console if changed
JobClient:
Determines proper division of input into InputSplits
Sends job data to master JobTracker server
JobTracker:
Inserts jar and JobConf (serialized to XML) in shared location
Posts a JobInProgress to its run queue
TaskTrackers running on slaves periodically query JobTracker for work
Retrieve job-specific jar and config
Launch task in separate instance of Java
main() is provided by Hadoop
87
Job Launch: Task and TaskRunner
TaskTracker.Child.main():
Sets up the child TaskInProgress attempt
Reads XML configuration
Connects back to necessary MapReduce components via RPC
Uses TaskRunner to launch user process
TaskRunner, MapTaskRunner, MapRunner work in a daisy-chainto launch Mapper
Task knowsahead of time which InputSplits it should be mapping
Calls Mapper once for each record retrieved from the InputSplit
Running the Reducer is much the same
88
Univ. of Crete Spring 2015
Creating the Mapper
One instance of your Mapper is initialized by the MapTaskRunner for a TaskInProgress
Exists in separate process from all other instances of Mapper –no data sharing!
public void map (WritableComparableWritableComparableWritableComparableWritableComparable key, Writable
Writable Writable
Writable value, Context
Context Context
Context context)
89
Data Types in Hadoop
Writable Defines a de/serialization protocol Every data type in Hadoop is a Writable
WritableComparable Defines a sort order
All keys must be of this type (but not values)
IntWritable LongWritable Text
…
Concrete classes for different data types
SequenceFiles Binary encoded of a sequence of key/value pairs
90
Univ. of Crete Spring 2015
What is Writable?
Hadoop defines its own “box” classes for strings (Text), integers (IntWritable), etc.
All values are instances of Writable
All keys are instances of WritableComparable
Writing for cache coherency
while (more input exists) {
myIntermediate = new intermediate(input);
myIntermediate.process();
export outputs;
}
91 Intermediates Intermediates Intermediates Intermediates Intermediates
Getting Data to the Mapper
Data sets are specified by InputFormats
Defines input data (e.g., a directory)
Identifies partitions of the data that form an InputSplit
Factory for RecordReader objects to extract (k,v) records from the input source
InputSplit InputSplit InputSplit InputSplit InputSplit
RecordReader RecordReader RecordReader RecordReader RecordReader
Mapper Mapper Mapper Mapper Mapper
InputFormatInputFormatInputFormatInputFormat
Input File Input File
92
Univ. of Crete Spring 2015
FileInputFormat and Friends
TextInputFormat
Treats each ‘\n’-terminated line of a file as a value
KeyValueTextInputFormat
Maps ‘\n’- terminated text lines of “k SEP v”
SequenceFileInputFormat
Binary file of (k, v) pairs with some add’l metadata
SequenceFileAsTextInputFormat
Same, but maps (k.toString(), v.toString())
FileInputFormat will read all files out of a specified directory and send them to the mapper
Delegates filtering this file list to a method subclasses may override e.g., create your own “xyzFileInputFormat” to read *.xyz from
directory list
These classes make use of the readFields() method of the specific Writable classes used by your MapReduce pass
93
A simple CustomWritable
public class MyWritable implements Writable implements Writable implements Writable { implements Writable
private int counter; private long timestamp;
public void write(DataOutput out) throws IOException { out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException { counter = in.readInt();
timestamp = in.readLong();
}
public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
} }
94
Univ. of Crete Spring 2015
Record Readers
Each InputFormat provides its own RecordReader implementation
Provides (unused?) capability multiplexing
LineRecordReader
Reads a line from a text file
KeyValueRecordReader
Used by KeyValueTextInputFormat
95
Input Split Size
FileInputFormat will divide large files into chunks
Exact size controlled by mapred.min.split.size
RecordReaders receive file, offset, and length of chunk
Custom InputFormat implementations may override split size
e.g., “NeverChunkFile”
96
Univ. of Crete Spring 2015
WritableComparator
Compares WritableComparable data
Will call WritableComparable.compare()
Can provide fast path for serialized data
JobConf.setOutputValueGroupingComparator()
97
Sending Data To Reducers: The Context Class
Both Mapper and Reducer define an inner class called Context which implements the JobContext interface
Job also implements JobContext
when you create a new Job, you also set the context for the Mapper and Reducer
Some methods of Context:
write: generate an output key/value pair
progress and setStatus: report progress or set the status of the task
getCounter: get access (read/write) to the value of a Counter
getConfiguration: return the configuration for the job
getCacheFiles: get cache files set in the Configuration
98
Univ. of Crete Spring 2015
Partitioning
Which reducer will receive the intermediate output keys and values?
(key, value) pairs with the same key end up at the same partition
The mappers partition data independently
they never exchange information with one another
Hadoop uses an interface called Partitioner to determine which partition a (key, value) pair will go to
A single partition refers to all (key, value) pairs which will be sent to a single reduce task: #partitions = #reduce tasks
each Reducer can process multiple reduce tasks
The Partitioner determines the load balancing of the reducers
JobConf sets Partitioner implementation
99
The Partitioner interface
The Partitioner interface defines the getPartition() method
Input: a key, a value and the number of partitions
Output: a partition id for the given key, value pair
The default Partitioner is the HashPartitioner:
int getPartition(K key, V value, int numPartitions) { return key.hashCode() % numPartitions;
}
Key key.hashCode() partitionId (0-2)
hello 12847 1
world 23874 0
map 82375 1
reduce 12839 2
numPartitions = 3
100
Univ. of Crete Spring 2015
Partition and Shuffle
Mapper Mapper Mapper Mapper Mapper
Partitioner Partitioner Partitioner Partitioner Partitioner Intermediates Intermediates Intermediates Intermediates Intermediates
Reducer Reducer Reduce
Intermediates Intermediates Intermediates (combiners omitted here)
101
Reduction
public void reduce(WritableComparableWritableComparableWritableComparable key, WritableComparable Iterator
IteratorIterator
Iterator values, Context
ContextContext
Context context)
Keys & values sent to one partition all go to the same reduce task
Calls are sorted by key:
“earlier” keys are reduced and output before “later” keys
102
Univ. of Crete Spring 2015
Finally: Writing The Output
Reducer Reducer Reduce
Output File RecordWriter
OutputFormatOutputFormatOutputFormatOutputFormat
Output File RecordWriter
Output File RecordWriter
103
OutputFormat
The OutputFormat and RecordWriter interfaces dictate how to write the results of a job back to the underlying permanent storage
The default format (TextOutputFormat) will write (key, value) pairs as strings to individual lines of an output file
Writes “key val\n” strings using the toString() methods of the keys and values
The SequenceFileOutputFormat will keep the data in binary, so it can be later read quickly by the SequenceFileInputFormat
Uses a binary format to pack (k, v) pairs
NullOutputFormat
Discards output
These classes make use of the write() method of the specific Writable classes used by your MapReduce pass
104
Univ. of Crete Spring 2015
Sending Data to the Client
Reporter object sent to Mapper allows simple asynchronous feedback
incrCounter(Enum key, long amount)
setStatus(String msg)
Allows self-identificationof input
InputSplit getInputSplit()
105
MapReduce Coding Summary
Decompose problem into appropriate workflow of MapReduce jobs
For each job, implement the following
Job configuration
Map function
Reduce function
Combiner function (optional)
Partition function (optional)
Might have to create custom data types as well
WritableComparable for keys
Writable for values
106
Univ. of Crete Spring 2015
Local (Standalone) Mode
Runs same MapReduce user program as cluster version, but does it sequentially
Does not use any of the Hadoop daemons
Works directly with local file system
No HDFS, hence no need to copy data to/from HDFS
Great for development, testing, initial debugging
107
Pseudo-Distributed Mode
Still runs on single machine, but now simulates a real Hadoop cluster
Simulates multiple nodes
Runs all daemons
Uses HDFS
Main purpose: more advanced testing and debugging
You can also set this up on your laptop
108
Univ. of Crete Spring 2015
Running a MapReduce Workflow
Linear chain of jobs
To run job2 after job1, create JobConf’s conf1 and conf2 in main function
Call JobClient.runJob(conf1); JobClient.runJob(conf2);
Catch exceptions to re-start failed jobs in pipeline
More complex workflows
Use JobControl from org.apache.hadoop.mapred.jobcontrol
109
Lifecycle of a MapReduce Job
How are the number of splits, number of map and reduce tasks, memory allocation to tasks, etc., determined?
Map Wave 1
Reduce Wave 1 Map
Wave 2
Reduce Wave 2 Input
Splits
Time
110
Univ. of Crete Spring 2015
HDFS Limitations
“Almost” GFS (Google FS)
No file update options (record append, etc); all files are write-once
Does not implement demand replication
Designed for sequential access
Random seeks devastate performance
111
Comparison of MapReduce and Other Approaches
MapReduce: The Programming Model and Practice", SIGMETRICS, Turorials 2009, Google.
112
Univ. of Crete Spring 2015
Comparison of MapReduce and other approaches
MapReduce: The Programming Model and Practice", SIGMETRICS, Turorials 2009, Google.
113
MapReduce: A Step Backwards?
Don’t need 1000 nodes to process petabytes:
Parallel DBs do it in fewer than 100 nodes
No support for schema:
Sharing across multiple MR programs is difficult
No indexing:
Wasteful access to unnecessary data
Non-declarative programming model:
Requires highly-skilled programmers
No support for JOINs:
Requires multiple MR phases for the analysis
Agrawal et al., VLDB 2010 Tutorial
114
Univ. of Crete Spring 2015
Map Reduce vs Parallel DBMS
Parallel DBMS MapReduce
Schema Support Not out of the box
Indexing Not out of the box
Programming Model Declarative (SQL)
Imperative (C/C++, Java, Y) Extensions through
Pig and Hive Optimizations
(Compression, Query Optimization)
Not out of the box
Flexibility Not out of the box
Fault Tolerance Coarse grained
techniques
[Pavlo et al., SIGMOD 2009, Stonebraker et al., CACM 2010, 2]