• No results found

Introduction to Map/Reduce & Hadoop

N/A
N/A
Protected

Academic year: 2021

Share "Introduction to Map/Reduce & Hadoop"

Copied!
66
0
0

Loading.... (view fulltext now)

Full text

(1)

1

Introduction to Map/Reduce & Hadoop

V. CHRISTOPHIDES

Department of Computer Science University of Crete

2

Univ. of Crete Spring 2015

What is MapReduce?

MapReduce: programming model and associated implementation for batch processing of large data sets

MapReduce could be implemented on different architectures, but Google proposed it for clusters large clusters of commodity PCs

Functional programming meets distributed computing:

Automatic parallelization & distribution

A clean abstraction for programmers by factoring out many reliability concerns from application logic

Fault-tolerance

Status and monitoring tools

MapReduce implementations such as Hadoop differ in details, but main principles are the same

(2)

3

Overview

Clever abstraction that is a good fit for many real-world problems

Programmer focuses on algorithm itself

Runtime system takes care of all messy details

Partitioningof input data

Schedulingprogram execution

Handling machine failures

Managing inter-machine communication

Divide and Conquer

4

Univ. of Crete Spring 2015

MapReduce Implementations

Google has a proprietary implementation in C++

Bindings in Java, Python

Hadoop is an open-source implementation in Java

Development led by Yahoo, now an Apache project

Used in production at Yahoo, Facebook, Twitter, LinkedIn, Netflix but also A9.com, AOL, The New York Times, Last.fm, Baidu.com, Joost, Veoh, etc.

The de facto big data processing platform

Rapidly expanding software ecosystem

Lots of custom research implementations

For GPUs, cell processors, etc.

(3)

5

Hadoop: Storage & Compute on One Platform

Evolution from Apache Hadoop to the Enterprise Data Hub A. Awadallah Co-Founder & CTO of Cloudera SMDB 2014

6

Univ. of Crete Spring 2015

Expanding Data Requires A New Approach

Evolution from Apache Hadoop to the Enterprise Data Hub A. Awadallah Co-Founder & CTO of Cloudera SMDB 2014

(4)

7

Map Reduce Foundations

8

Univ. of Crete Spring 2015

Map and Reduce: The Origins

The programming idea of Map, and Reduceis 40+ year old

Present in all Functional Programming Languages e.g., APL, Lisp and ML

Alternate Map names: Apply-All

Higher Order Functions

take function definitions as arguments, or

return a function as output

Map and Reduce are higher-order functions

Map processes each record individually

Reduce processes (combines) set of all records in a batch

(5)

9

Map: A Higher Order Function

F(x: int) returns r: int

Let V be an array of integers W = map(F, V)

W[i] = F(V[i]) for all I i.e., apply F to every element of V

Examplesin Haskell

map (+1) [1,2,3,4,5]

== [2, 3, 4, 5, 6]

map (toLower) "abcDEFG12!@#“

== "abcdefg12!@#“

map (`mod` 3) [1..10]

== [1, 2, 0, 1, 2, 0, 1, 2, 0, 1]

10

Univ. of Crete Spring 2015

reduce: A Higher Order Function

reducealso known as fold, accumulate, compress or inject

Reduce/fold takes in a function and folds it in between the elements of a list

(6)

11

Fold-Left in Haskell

Definition

foldl f z [] = z

foldl f z (x:xs) = foldl f (f z x) xs

Examples

foldl (+) 0 [1..5] ==15

foldl (+) 10 [1..5] == 25

foldl (div) 7 [34,56,12,4,23] == 0

12

Univ. of Crete Spring 2015

Fold-Right in Haskell

Definition

foldr f z [] = z

foldr f z (x:xs) = f x (foldr f z xs)

Example

foldr (div) 7 [34,56,12,4,23] == 8

(7)

13

14

Univ. of Crete Spring 2015

Examples of

Map Reduce Computation

(8)

15

What can I do in MapReduce?

Three main functions:

Querying

Filtering (distributed grep, etc.)

Relational-based (join, selection, projection, etc.)

Summarizing

Computing Aggregates (word/record count, Min/Max/Average/Median/Standard deviation, etc.)

Data Organization (sort, indexing, etc.)

Analyzing

 Iterative Message Passing (graph processing)

. . . large datasets in offline mode for boosting other on-line processes

16

Univ. of Crete Spring 2015

Grep Example

Search input files for a given pattern: e.g., Given a list of tweets (username, date, text) determine the tweets that contain a word

Map: emits a (filename, line) if pattern is matched

Reduce: Copies results to output Input

data

Split data Split data Split data Split data

grep grep grep grep

matches matches matches matches

cat All

matches

(9)

17

Word Count Example

Read text files and count how often words occur

The input is text files

The output is a text file

each line: word, tab, count

Map: Produce pairs of (word, count = 1) from files

Reduce: For each word, sum up the counts (i.e., fold) Input

data

Split data Split data Split data Split data

token token token token

count count count count

merge All

matches

18

Univ. of Crete Spring 2015

Inverted Index Example

(this was the original Google's Use Case)

Generate an inverted index of words from a given set of files

Map: parses a document and emits <word, docId> pairs

Reduce: takes all pairs for a given word, sorts the docId values, and emits a <word, list(docId)> pair

Input data

Split data Split data Split data Split data

parse parse parse parse

inverse inverse inverse inverse

cat Inverted list

(10)

19

Map Reduce

Input

data Result

M A P

R E D U C E Partitioning

Function

Map:

Accepts input key/value pair

Emits intermediate key/value pair

Reduce :

Accepts intermediate key/value* pair

Emits output key/value pair

20

Univ. of Crete Spring 2015

Real World Applications in MapReduce

http://www.dbms2.com/2008/08/26/known-applications-of-mapreduce/

Organizations MapReduce Applications

Google Wide-range applications, grep/ sorting, machine learning, clustering, report extraction, graph computation

Yahoo Data model training, Web map construction, Web log processing using Pig, and much, much more

Amazon Build product search indices

Facebook Web log processing via both MapReduce and Hive PowerSet(Microsoft) HBase for natural language search

Twitter Web log processing using Pig New York Times Large-scale image conversion

Others (>74) Details in http://wiki.apache.org/hadoop/PoweredBy (so far, the longest list of applications for MapReduce

(11)

21

MapReduce Principle

applied to BigData

22

Univ. of Crete Spring 2015

Adapt MapReduce for BigData

Always maps/reduces on list of key/value pairs

Records from the data source (lines out of files, rows of a database, etc) are fed into the map function as key*value pairs

map() produces one or more intermediate values along with an output key from the input

After the map phase is over, all the intermediate values for a given output key are combined together into a list

reduce() combines those intermediate values into one or more final values for that same output key

in practice, only one final value per key

(12)

23

Adapt MapReduce for BigData

Automatic Parallelization:

Depending on the size of RAW INPUT DATA  instantiate multiple map() tasks

Depending upon the number of intermediate

<key, value>

partitions  instantiate multiple reduce() tasks

Master program divvies up tasks based on data location

tries to have map() tasks on same machine as physical file data, or at least same rack

24

Univ. of Crete Spring 2015

Adapt MapReduce for BigData

Map/Reduce execute in parallelon a cluster

Fault tolerance is built in the framework

Specific systems/implementation aspects matters

How is data partitioned as input to map

How is data serialized between processes

Cloud specific improvements:

Handle elasticity

Take cluster topology (e.g., node proximity, node size) into account

Source: Google Developers

(13)

25

Execution on Clusters

Input files split (M splits)

Assign Master & Workers

Map tasks

Writing intermediate data to disk (R regions)

Intermediate data read & sort

Reduce tasks

Return

26

Univ. of Crete Spring 2015

Execution Overview

Master assigns map and reduce tasks to workers, taking data location into account

Mapper reads an assigned file split and writes intermediate key-value pairs to local disk

Mapper informs master about result locations, who in turn informs the reducers

Reducers pull data from appropriate mapper disk location

After map phase is completed, reducers sort their data by key

For each key, the Reduce function is executed and output is appended to final output file

When all reduce tasks are completed, master wakes up user program

(14)

27

MapReduce Data Flow in Hadoop

with no Reduce Tasks

28

Univ. of Crete Spring 2015

MapReduce Data Flow in Hadoop with

a Single Reduce Task

(K (K (K

(K1111, V, V, V, V1111)))) (K2(K(K(K222, V, V, V2, V222))))

(K2, List<V2>) (K2, List<V2>) (K2, List<V2>)

(K2, List<V2>) (K3, V3)(K3, V3)(K3, V3)(K3, V3)

(15)

29

MapReduce Data Flow in Hadoop

with Multiple Reduce Tasks

30

Univ. of Crete Spring 2015

MapReduce Programing Constructs in Hadoop

Programmers must specify:

map(k, v) → <k’, v’>*

reduce(k’, v’) → <k’, v’>*

All values with the same key are reduced together

Optionally, also:

partition(k’, number of partitions) → partition for k’

Often a simple hash of the key, e.g., hash(k’) mod n

Divides up key space for parallel reduce operations combine(k’, v’) → <k’, v’>*

Mini-reducers that run in memory after the map phase

Used as an optimization to reduce network traffic

1:many1:many1:many1:many

(16)

31

The Execution Framework Handles “Everything Else”

The execution framework handles “everything else”

Scheduling: assigns workers to map and reduce tasks

“Data distribution”: moves processes to data

Synchronization: gathers, sorts, and shuffles intermediate data

Errors and faults: detects worker failures and restarts

Limited control over data and execution flow

All algorithms must expressed in m, r, c, p

You don’t know:

Where mappers and reducers run

When a mapper or reducer begins or finishes

Which input a particular mapper is processing

Which intermediate key a particular reducer is processing

32

Univ. of Crete Spring 2015

Execution

Map Phase

Shuffle Phase

Reduce Phase

(17)

33

Automatic Parallel Execution in MapReduce (Google)

Master

Task • Usually many more map tasks than machines

• E.g.

• 200K map tasks

• 5K reduce tasks

• 2K machines

34

Univ. of Crete Spring 2015

Map/Reduce Execution in Clusters

Several map or reduce tasks can run on a single computer

Each intermediate file is divided into R partitions, by partitioning function

Each reduce task corresponds to one partition

(18)

35

Moving Data From Mappers to Reducers

Mappers output need to be separated for different reducers

Reducers need to collect their data from all mappers and group it by key

Keys at each reducer are processed in order!

Shuffle& sortphase=synchronization barrier between map & reduce phase

Collects the output from all map executions

Transforms the map output Into the reduce input

Divides the map output Into chunks

Often one of the most expensive parts of a MapReduce execution

36

Univ. of Crete Spring 2015

Spilled to a new disk file when almost full

Spill files on disk:

partitioned by reduce task, each partition sorted by key

Spill files merged into single output file

Merge happens in memory if data fits, otherwise also on disk

Reduce task starts copying data from map task as soon as it completes. Reduce cannot start working on the data until all mappers have finished and their data has arrived.

Shuffle and Sort in Map Reduce

(19)

37

 Probably the most complex aspect of MapReduce!

 Map side

Map outputs are buffered in memory in a circular buffer

When buffer reaches threshold, contents are “spilled” to disk

Spills merged in a single, partitioned file (sorted within each partition):

combiner runs here

 Reduce side

First, map outputs are copied over to reducer machine

“Sort” is a multi-pass merge of map outputs (happens in memory and on disk): combiner runs here

Final merge pass goes directly into reducer

Shuffle and Sort in Hadoop

38

Univ. of Crete Spring 2015

Shuffle and Sort in Hadoop

Mapper

Reducer

other mappers

other reducers circular

buffer (in memory)

spills (on disk)

merged spills (on disk)

intermediate files (on disk)

Combiner

Combiner

(20)

39

Tools for Synchronization

Cleverly-constructed data structures

Bring partial results together

Sort order of intermediate keys

Control order in which reducers process keys

Partitioner

Control which reducer processes which keys

Preserving state in mappers and reducers

Capture dependencies across multiple keys and values

40

Univ. of Crete Spring 2015

Master Data Structures

Master keeps track of status of each map and reduce task and who is working on it

Idle, in-progress, or completed

Master stores location and size of output of each completed map task

Pushes information incrementally to workers with in-progress reduce tasks

(21)

41

Semantics with Failures

If map and reduce are deterministic, then output is identical to non- faulting sequential execution

For non-deterministic operators, different reduce tasks might see output of different map executions

Relies on atomic commit of map and reduce outputs

In-progress task writes output to private temp file

Mapper: on completion, send names of all temp files to master (master ignores if task already complete)

Reducer: on completion, atomically rename temp file to final output file (needs to be supported by distributed file system)

42

Univ. of Crete Spring 2015

Fault Recovery

Workers are commodity (low cost) machines so failures are frequent:

given n workers, probability of at least one "problem" is 1 –(1-p)n

Workers are pinged by master periodically

Non-responsive workers are marked as failed

All tasks in-progress or completed by failed worker become eligible for rescheduling (reset to idle state)

Can be assigned to other mapper

Completed tasks are re-executed since result is stored on mapper’s local disk

Reducers are notified about mapper failure, so that they can read the data from the replacement mapper

(22)

43

Fault Recovery

Failed reducers identified through ping as well

Reducer’s in-progress tasks are reset to idle state

Can be assigned to other reducer

No need to restart completed reduce tasks, because result is written to distributed file system

Current implementations abort MapReduce computation on master failure

Users re-submit aborted jobs when new master process is up

Alternative: master writes periodic checkpoints of its data structures so that it can be re-started from checkpointed state

44

Univ. of Crete Spring 2015

Machines Share Roles

So far, logical view of cluster

In reality

Each cluster machine stores data

And runs MapReduce workers

Lots of storage + compute cycles nearby

(23)

45

The MapReduce “Runtime”

Handles scheduling

Assigns workers to map and reduce tasks

Handles “data distribution”

Moves processes to data

Handles synchronization

Gathers, sorts, and shuffles intermediate data

Handles errors and faults

Detects worker failures and restarts

Everything happens on top of a distributed filesystem

46

Univ. of Crete Spring 2015

Implementing in MapReduce

Externally for the user

Write a map function, and a reduce function

Submit a job; wait for result

No need to know anything about the environment

Google: 4000 servers + 48000 disks, many failures

Internally for MapReduce system designer

Run map in parallel

Shuffle: combine map results to produce reduce input

Run reduce in parallel

Deal with failures

(24)

47

Component Overview

48

Univ. of Crete Spring 2015

Peta-Bytes Data Processing

(25)

49

Google’s MapReduceinspired Yahoo’s Hadoop

Now as an open source (Java) Project of Apache hadoop.apache.org

We will use release 2.6.0 (November 2014)

Distribute processing of petabytes data across thousands of commodity machines

In 2008 about 100k MapReduce jobs per day in Google

 over 20 petabytes of data processed per day

 each job occupies about 400 servers

Easy to use since run-time complexity hidden from the users

50

Univ. of Crete Spring 2015

Hadoop Key Components

MapReduce -distributes applications

Hadoop Distributed File System (HDFS) -distributes data

Store big files across machines

Store each file as a sequence of blocks

Blocks of a file are replicated for fault tolerance

(26)

51

‘MapReduce’ vs. Hadoop

MapReduce Hadoop

Org Google Yahoo/Apache

Impl C++ Java

Distributed File Sys GFS HDFS

Data Base Bigtable HBase

Distributed lock Mgr Chubby ZooKeeper

History

Dec 2004 – Google GFS paper published

July 2005 – Nutch uses MapReduce

Jan 2006 – Doug Cutting joins Yahoo!

Feb 2006 – Becomes Lucene subproject

Apr 2007 – Yahoo! on 1000-node cluster

Jan 2008 – An Apache Top Level Project

Feb 2008 – Yahoo! production search index

52

Univ. of Crete Spring 2015

Google’s Problem in 2003: Already Lots of Data

Example: 20+ billion web pages x 20KB = 400+

terabytes

One computer can read 30-35 MB/sec from disk

~four months to read the web

~1,000 hard drives just to store the web

Even more to do something with the data:

process crawled documents

process web request logs

build inverted indices

construct graph representations of web documents

(27)

53

The Need: Data & Programs Distribution!

A Distributed System:

Scalable

Fault-Tolerant

Easy to Program

Applicable to Many Problems

Re-discovered by Google with goals:

"Reliabilityhas to come from the software"

"How can we make it easy to writedistributed programs?"

54

Univ. of Crete Spring 2015

How do we get Data to the Workers?

What’s the problem here?

Compute Nodes

SAN NAS

(28)

55

Distributed File System

Don’t move data to workersY Move workers to the data!

Store data on the local disks for nodes in the cluster

Start up the workers on the node that has the data local!

Why?

Not enough RAM to hold all the data in memory

Disk access is slow, disk throughput is good

A distributed file system is the answer

GFS (Google File System)

DFS for Hadoop (= GFS clone)

56

Univ. of Crete Spring 2015

Distributed File System Principles

Single Namespace for entire cluster

Data Coherency

Write-once-read-manyaccess model

Client can only append to existing files

Files are broken up into blocks

Typically 128 MB block size

Each block replicated on multiple DataNodes

Intelligent Client

Client can find location of blocks

Client accesses data directly from DataNode

(29)

57

Hadoop Supported File Systems

HDFS: Hadoop's own file system

Amazon S3 file system

Targeted at clusters hosted on the Amazon Elastic Compute Cloud server-on-demand infrastructure

Not rack-aware

CloudStore, previously Kosmos Distributed File System

like HDFS, this is rack-aware

optimization which takes into account the geographic clustering of servers

network traffic between servers in different geographic clusters is minimized

FTP Filesystem

stored on remote FTP servers

Read-only HTTP and HTTPSfile systems

58

Univ. of Crete Spring 2015

Hadoop Distributed File System (HDFS)

Very Large Distributed File System

10K nodes, 100 million files, 10 PB

Assumes Commodity Hardware

Files are replicated to handle hardware failure

Detect failures and recovers from them

Optimized for Batch Processing

Data locations exposed so that computations can move to where data resides

Provides very high aggregate bandwidth

User Space, runs on top of the file systems of the underlying OS

(30)

59

Hadoop Distributed File System (HDFS)

Mastermanages the file system and access to files by clients

Runs on master node of the HDFS cluster

Directs DataNodes to perform their low-level I/O tasks

Slavemanages the storages attached to the nodes running on

Runs on each slave machine in the HDFS cluster

Does the low-level I/O work

Files stored in many blocks

Each block has a block Id

Block Id associated with several nodes hostname:port (depending on level of replication)

60

Univ. of Crete Spring 2015

Secondary NameNode Client

HDFS Architecture

NameNode

DataNodes

Cluster Membership

Cluster Membership

NameNode: Maps a file to a file-id and list of MapNodes DataNode : Maps a block-id to a physical location on disk SecondaryNameNode: Periodic merge of Transaction log

(31)

61

NameNode Metadata

Meta-data in Memory

The entire metadata is in main memory

No demand paging of meta-data

Types of Metadata

List of files

List of Blocks for each file

List of DataNodes for each block

File attributes, e.g creation time, replication factor

A Transaction Log stored in multiple directories (on a local/remote FS)

Records file creations, file deletions. etc

62

Univ. of Crete Spring 2015

DataNode

A Block Server

Stores data in the local file system (e.g. ext3)

Stores meta-data of a block (e.g. CRC)

Serves data and meta-data to Clients

Block Report

Periodically sends a report of all existing blocks to the NameNode

Facilitates Pipelining of Data

Forwards data to other specified DataNodes

(32)

63

Block Placement

Current Strategy

One replica on local node

Second replica on a remote rack

Third replica on same remote rack

Additional replicas are randomly placed

Clients read from nearest replica

Would like to make this policy pluggable

64

Univ. of Crete Spring 2015

Data Correctness

Use Checksums to validate data

Use CRC32

File Creation

Client computes checksum per 512 byte

DataNode stores the checksum

File access

Client retrieves the data and checksum from DataNode

If validation fails, Client tries other replicas

(33)

65

Primary vs Secondary NameNode

NameNode daemon

“Head” interface to HDFS cluster

Records all global metadata

A single point of failure

Secondary NameNode daemon

One per cluster to monitor status of HDFS: Not a failover NameNode!

Records snapshots of HDFS metadata from “real” NameNode to facilitate recovery from failures

Can merge update logs in flight

Can upload snapshot back to primary

Need to develop a real High Availability (HA) solution

66

Univ. of Crete Spring 2015

Data Pipelining

Client retrieves a list of DataNodes on which to place replicas of a block

Client writes block to the first DataNode

The first DataNode forwards the data to the next DataNode in the Pipeline

When all replicas are written, the Client moves on to write the next block in file

(34)

67

Rebalancer

Goal: % disk full on DataNodes should be similar

Usually run when new DataNodes are added

Cluster is online when Rebalancer is active

Rebalancer is throttled to avoid network congestion

Command line tool

68

Univ. of Crete Spring 2015

Configuring HDFS

dfs.name.dir

Dir in the namenode to store metadata

dfs.data.dir

Dir in data nodes to store the data blocks

Must be in a local disk partition

fs.default.name

URI of the namenode

conf/masters

ip address of the master

conf/slaves

ip address of the slaves

Should have password-less SSH access to all the nodes

conf/hdfs-site.xml

<property>

<name>dfs.name.dir</name>

<value>/tmp/hadoop-test/name</value>

</property>

<property>

<name>dfs.data.dir</name>

<value>/tmp/hadoop-test/data</value>

</property>

conf/core-site.xml

<property>

<name>fs.default.name</name>

<value>hdfs://149.79.89.113:9000/</va lue>

</property>

(35)

69

User Interface

Command for HDFS User

hadoop dfs -mkdir /foodir

hadoop dfs -cat /foodir/myfile.txt

hadoop dfs -rm /foodir myfile.txt

Command for HDFS Administrator

 hadoop dfsadmin -report

 hadoop dfsadmin -decommission datanodename

Web Interface

http://host:port/dfshealth.jsp

http://hadoop.apache.org/common/docs/current/hdfs_shell.html

70

Univ. of Crete Spring 2015

WordCount

A Simple Hadoop Example

http://wiki.apache.org/hadoop/WordCount

(36)

71

Word Count over a Given Set of Web Pages

How can we do word count in parallel?

see bob throw see 1

bob 1

throw 1

see 1

spot 1

run 1

bob 1

run 1

see 2

spot 1 throw 1 see spot run

72

Univ. of Crete Spring 2015

Word Counting with MapReduce

Documents Financial, IMF, Economics, Crisis Financial, IMF, Crisis

Map

Key Value

Financial 1

IMF 1

Economics 1

Crisis 1

Financial 1

IMF 1

Crisis 1

Key Value

Economics 1

Harry 1

Financial 1

Harry 1

Potter 1

Film 1

Crisis 1

Harry 1

Potter 1

`

`

`

`

`

`

`

Documents Economics, Harry Financial, Harry, Potter, Film Crisis, Harry, Potter

Map

Doc1

Doc2

Doc3

Doc4

Doc5

M1

M2

© Kyuseok Shim (VLDB 2012 TUTORIAL)

(37)

73

Word Counting with MapReduce

Reduc

Key Value

Financial 3

IMF 2

Economics 2

Crisis 3

Harry 3

Film 1

Potter 2

Key Value

Financial 1 Financial 1 Financial 1

IMF 1

IMF 1

Economics 1 Economics 1

Key Value

Crisis 1

Crisis 1

Crisis 1

Harry 1

Harry 1

Harry 1

Film 1

Potter 1

Potter 1

Before reduce functions are called, for each distinct key, the list of its values is generated

Key Value list

Financial 1, 1, 1

IMF 1, 1

Economics 1, 1

Crisis 1, 1, 1

Harry 1, 1, 1

Film 1

Potter 1, 1

``

Documents Financial, IMF, Economics, Crisis Financial, IMF, Crisis

Map

Documents Economics, Harry Financial, Harry, Potter, Film Crisis, Harry, Potter

Map Reduce

Doc1

Doc2

Doc3

Doc4

Doc5

© Kyuseok Shim (VLDB 2012 TUTORIAL)

`

74

Univ. of Crete Spring 2015

Basic Hadoop API

 Mapper

void map(K1 key, V1 value, OutputCollector<K2, V2>

output, Reporter reporter)

void configure(JobConf job)

void close() throws IOException

 Reducer/Combiner

void reduce(K2 key, Iterator<V2> values,

OutputCollector<K3,V3> output, Reporter reporter)

void configure(JobConf job)

void close() throws IOException

 Partitioner

void getPartition(K2 key,V2 value,int numPartitions)

*Note: forthcoming API changes2

(38)

75

WordCount Mapper

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void mapmapmapmap(LongWritable key, Text value,

Context context) throws IOException, InterruptedException { String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

} } }

76

Univ. of Crete Spring 2015

WordCount Reducer

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>

public void reducereducereducereduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) { sum += val.get();

}

context.write(key, new IntWritable(sum));

} }

(39)

77

Job Configuration

Job object forms the job specification and gives control for running the job

Setmapper and reducer class to be used

Setoutput key and value classes for map and reduce functions

For reducer: setOutputKeyClass(), setOutputValueClass()

For mapper (omit if same as reducer): setMapOutputKeyClass(), setMapOutputValueClass()

Can set input types similarly (default is TextInputFormat)

Specify data input path using setInputPath()

Can be single file, directory (to use all files there), or file pattern

addInputPath() can be called multiple times to add multiple paths

Specify output path using setOutputPath()

Single output path, which is a directory for all output files

Method waitForCompletion() submits job and waits for it to finish

78

Univ. of Crete Spring 2015

Job Configuration

public int run(String[] args) throws Exception { Job job = new Job(getConf(), "WordCount");

job.setJarByClass(WordCount.class);

job.setOutputKeyClass job.setOutputKeyClassjob.setOutputKeyClass

job.setOutputKeyClass(((Text.class(Text.classText.class); Text.class); ); );

job.setOutputValueClass job.setOutputValueClass job.setOutputValueClass

job.setOutputValueClass((((IntWritable.classIntWritable.classIntWritable.classIntWritable.class); ); ); );

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setCombinerClass job.setCombinerClass job.setCombinerClass

job.setCombinerClass((((Reduce.classReduce.classReduce.classReduce.class););););

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setNumReduceTasks job.setNumReduceTasks job.setNumReduceTasks job.setNumReduceTasks((((2222););););

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

return 0;

}

(40)

79

Invocation of WordCount

hadoop hadoop hadoop

hadoop dfsdfsdfsdfs ---mkdir-mkdirmkdir <mkdir <hdfs<<hdfshdfshdfs----dirdirdir>dir>>>

hadoop hadoop hadoop

hadoop dfsdfsdfs -dfs --copyFromLocal-copyFromLocalcopyFromLocal <copyFromLocal <local<<locallocal-local---dirdir> dirdir> > > <<<<hdfshdfs-hdfshdfs--dir-dirdir> dir> > >

hadoop hadoop hadoop hadoop

jar hadoop jar hadoop jar hadoop

jar hadoop---*-**-*--examples.jar -examples.jar examples.jar examples.jar WordCount

WordCount WordCount WordCount [

[ [

[----m <#maps>] m <#maps>] m <#maps>] m <#maps>]

[ [ [

[----r <#reducers>] r <#reducers>] r <#reducers>] r <#reducers>]

<in

<in

<in

<in----dirdirdirdir> > > >

<out

<out

<out

<out----dirdirdirdir> > > >

80

Univ. of Crete Spring 2015

Hadoop Job Tuning

Choose appropriate number of mappers and reducers

Define combinerswhenever possible

Consider Map output compression

Optimize the expensive shuffle phase(between mappers and reducers) by setting its tuning parameters

Profiling distributed MapReduce jobs is challenging

(41)

81

Job Configuration Parameters

190+ parameters in Hadoop

Do their settings impact performance?

What are ways to set these parameters?

Defaults -- are they good enough?

Best practices -- the best setting can depend on data, job, and cluster properties

Automatic setting (Not yet in Hadoop)

82

Univ. of Crete Spring 2015

MapReduce Development Steps

 Write Map and Reduce functions

Create unit tests

 Write driver program to run a job

Can run from IDE with small data subset for testing

If test fails, use IDE for debugging

Update unit tests and Map/Reduce if necessary

 Once program works on small test set, run it on full data set

If there are problems, update tests and code accordingly

 Fine-tune code, do some profiling

(42)

83

Mechanics of Programming

Hadoop Jobs

84

Univ. of Crete Spring 2015

Anatomy of a Hadoop Job in MR v1.0

 MapReduce program in Hadoop = Hadoop job

Jobs are divided into mapand reduce tasks

An instance of a running task is called a task attempt

Multiple jobs can be composed into a workflow

(43)

85

Launching a Hadoop Job

Source: http://www.jaso.co.kr/265

Client (i.e., driver program) creates a job, configures it, and submits it to job tracker

JobClient computes input splits (on client end)

Job data (jar, configuration XML) are sent to JobTracker

JobTracker puts job data in shared location, enqueues tasks

TaskTrackers poll JobTracker for tasks

TaskTrackers launch a task in a separate Java instance

86

Univ. of Crete Spring 2015

Job Launch: JobClient, JobTracker and TaskTracker

Client submits MapReduce job through

JobClient.runJob() // blocks

JobClient.submitJob() // does not block

waitForCompletion() submits job and polls JobTracker about progress every sec, outputs to console if changed

JobClient:

Determines proper division of input into InputSplits

Sends job data to master JobTracker server

JobTracker:

Inserts jar and JobConf (serialized to XML) in shared location

Posts a JobInProgress to its run queue

TaskTrackers running on slaves periodically query JobTracker for work

Retrieve job-specific jar and config

Launch task in separate instance of Java

main() is provided by Hadoop

(44)

87

Job Launch: Task and TaskRunner

TaskTracker.Child.main():

Sets up the child TaskInProgress attempt

Reads XML configuration

Connects back to necessary MapReduce components via RPC

Uses TaskRunner to launch user process

TaskRunner, MapTaskRunner, MapRunner work in a daisy-chainto launch Mapper

Task knowsahead of time which InputSplits it should be mapping

Calls Mapper once for each record retrieved from the InputSplit

Running the Reducer is much the same

88

Univ. of Crete Spring 2015

Creating the Mapper

One instance of your Mapper is initialized by the MapTaskRunner for a TaskInProgress

Exists in separate process from all other instances of Mapper –no data sharing!

public void map (WritableComparableWritableComparableWritableComparableWritableComparable key, Writable

Writable Writable

Writable value, Context

Context Context

Context context)

(45)

89

Data Types in Hadoop

Writable Defines a de/serialization protocol Every data type in Hadoop is a Writable

WritableComparable Defines a sort order

All keys must be of this type (but not values)

IntWritable LongWritable Text

Concrete classes for different data types

SequenceFiles Binary encoded of a sequence of key/value pairs

90

Univ. of Crete Spring 2015

What is Writable?

Hadoop defines its own “box” classes for strings (Text), integers (IntWritable), etc.

All values are instances of Writable

All keys are instances of WritableComparable

Writing for cache coherency

while (more input exists) {

myIntermediate = new intermediate(input);

myIntermediate.process();

export outputs;

}

(46)

91 Intermediates Intermediates Intermediates Intermediates Intermediates

Getting Data to the Mapper

Data sets are specified by InputFormats

Defines input data (e.g., a directory)

Identifies partitions of the data that form an InputSplit

Factory for RecordReader objects to extract (k,v) records from the input source

InputSplit InputSplit InputSplit InputSplit InputSplit

RecordReader RecordReader RecordReader RecordReader RecordReader

Mapper Mapper Mapper Mapper Mapper

InputFormatInputFormatInputFormatInputFormat

Input File Input File

92

Univ. of Crete Spring 2015

FileInputFormat and Friends

TextInputFormat

Treats each ‘\n’-terminated line of a file as a value

KeyValueTextInputFormat

Maps ‘\n’- terminated text lines of “k SEP v”

SequenceFileInputFormat

Binary file of (k, v) pairs with some add’l metadata

SequenceFileAsTextInputFormat

Same, but maps (k.toString(), v.toString())

FileInputFormat will read all files out of a specified directory and send them to the mapper

 Delegates filtering this file list to a method subclasses may override e.g., create your own “xyzFileInputFormat” to read *.xyz from

directory list

These classes make use of the readFields() method of the specific Writable classes used by your MapReduce pass

(47)

93

A simple CustomWritable

public class MyWritable implements Writable implements Writable implements Writable { implements Writable

private int counter; private long timestamp;

public void write(DataOutput out) throws IOException { out.writeInt(counter);

out.writeLong(timestamp);

}

public void readFields(DataInput in) throws IOException { counter = in.readInt();

timestamp = in.readLong();

}

public static MyWritable read(DataInput in) throws IOException {

MyWritable w = new MyWritable();

w.readFields(in);

return w;

} }

94

Univ. of Crete Spring 2015

Record Readers

Each InputFormat provides its own RecordReader implementation

Provides (unused?) capability multiplexing

LineRecordReader

Reads a line from a text file

KeyValueRecordReader

Used by KeyValueTextInputFormat

(48)

95

Input Split Size

FileInputFormat will divide large files into chunks

Exact size controlled by mapred.min.split.size

RecordReaders receive file, offset, and length of chunk

Custom InputFormat implementations may override split size

e.g., “NeverChunkFile”

96

Univ. of Crete Spring 2015

WritableComparator

Compares WritableComparable data

Will call WritableComparable.compare()

Can provide fast path for serialized data

JobConf.setOutputValueGroupingComparator()

(49)

97

Sending Data To Reducers: The Context Class

Both Mapper and Reducer define an inner class called Context which implements the JobContext interface

Job also implements JobContext

when you create a new Job, you also set the context for the Mapper and Reducer

Some methods of Context:

write: generate an output key/value pair

progress and setStatus: report progress or set the status of the task

getCounter: get access (read/write) to the value of a Counter

getConfiguration: return the configuration for the job

getCacheFiles: get cache files set in the Configuration

98

Univ. of Crete Spring 2015

Partitioning

Which reducer will receive the intermediate output keys and values?

(key, value) pairs with the same key end up at the same partition

The mappers partition data independently

they never exchange information with one another

Hadoop uses an interface called Partitioner to determine which partition a (key, value) pair will go to

A single partition refers to all (key, value) pairs which will be sent to a single reduce task: #partitions = #reduce tasks

each Reducer can process multiple reduce tasks

The Partitioner determines the load balancing of the reducers

JobConf sets Partitioner implementation

(50)

99

The Partitioner interface

The Partitioner interface defines the getPartition() method

Input: a key, a value and the number of partitions

Output: a partition id for the given key, value pair

The default Partitioner is the HashPartitioner:

int getPartition(K key, V value, int numPartitions) { return key.hashCode() % numPartitions;

}

Key key.hashCode() partitionId (0-2)

hello 12847 1

world 23874 0

map 82375 1

reduce 12839 2

numPartitions = 3

100

Univ. of Crete Spring 2015

Partition and Shuffle

Mapper Mapper Mapper Mapper Mapper

Partitioner Partitioner Partitioner Partitioner Partitioner Intermediates Intermediates Intermediates Intermediates Intermediates

Reducer Reducer Reduce

Intermediates Intermediates Intermediates (combiners omitted here)

(51)

101

Reduction

public void reduce(WritableComparableWritableComparableWritableComparable key, WritableComparable Iterator

IteratorIterator

Iterator values, Context

ContextContext

Context context)

Keys & values sent to one partition all go to the same reduce task

Calls are sorted by key:

“earlier” keys are reduced and output before “later” keys

102

Univ. of Crete Spring 2015

Finally: Writing The Output

Reducer Reducer Reduce

Output File RecordWriter

OutputFormatOutputFormatOutputFormatOutputFormat

Output File RecordWriter

Output File RecordWriter

(52)

103

OutputFormat

The OutputFormat and RecordWriter interfaces dictate how to write the results of a job back to the underlying permanent storage

The default format (TextOutputFormat) will write (key, value) pairs as strings to individual lines of an output file

Writes “key val\n” strings using the toString() methods of the keys and values

The SequenceFileOutputFormat will keep the data in binary, so it can be later read quickly by the SequenceFileInputFormat

Uses a binary format to pack (k, v) pairs

NullOutputFormat

Discards output

These classes make use of the write() method of the specific Writable classes used by your MapReduce pass

104

Univ. of Crete Spring 2015

Sending Data to the Client

Reporter object sent to Mapper allows simple asynchronous feedback

incrCounter(Enum key, long amount)

setStatus(String msg)

Allows self-identificationof input

InputSplit getInputSplit()

(53)

105

MapReduce Coding Summary

Decompose problem into appropriate workflow of MapReduce jobs

For each job, implement the following

Job configuration

Map function

Reduce function

Combiner function (optional)

Partition function (optional)

Might have to create custom data types as well

WritableComparable for keys

Writable for values

106

Univ. of Crete Spring 2015

Local (Standalone) Mode

Runs same MapReduce user program as cluster version, but does it sequentially

Does not use any of the Hadoop daemons

Works directly with local file system

No HDFS, hence no need to copy data to/from HDFS

Great for development, testing, initial debugging

(54)

107

Pseudo-Distributed Mode

Still runs on single machine, but now simulates a real Hadoop cluster

Simulates multiple nodes

Runs all daemons

Uses HDFS

Main purpose: more advanced testing and debugging

You can also set this up on your laptop

108

Univ. of Crete Spring 2015

Running a MapReduce Workflow

Linear chain of jobs

To run job2 after job1, create JobConf’s conf1 and conf2 in main function

Call JobClient.runJob(conf1); JobClient.runJob(conf2);

Catch exceptions to re-start failed jobs in pipeline

More complex workflows

Use JobControl from org.apache.hadoop.mapred.jobcontrol

(55)

109

Lifecycle of a MapReduce Job

How are the number of splits, number of map and reduce tasks, memory allocation to tasks, etc., determined?

Map Wave 1

Reduce Wave 1 Map

Wave 2

Reduce Wave 2 Input

Splits

Time

110

Univ. of Crete Spring 2015

HDFS Limitations

“Almost” GFS (Google FS)

No file update options (record append, etc); all files are write-once

Does not implement demand replication

Designed for sequential access

Random seeks devastate performance

(56)

111

Comparison of MapReduce and Other Approaches

MapReduce: The Programming Model and Practice", SIGMETRICS, Turorials 2009, Google.

112

Univ. of Crete Spring 2015

Comparison of MapReduce and other approaches

MapReduce: The Programming Model and Practice", SIGMETRICS, Turorials 2009, Google.

(57)

113

MapReduce: A Step Backwards?

Don’t need 1000 nodes to process petabytes:

Parallel DBs do it in fewer than 100 nodes

No support for schema:

Sharing across multiple MR programs is difficult

No indexing:

Wasteful access to unnecessary data

Non-declarative programming model:

Requires highly-skilled programmers

No support for JOINs:

Requires multiple MR phases for the analysis

Agrawal et al., VLDB 2010 Tutorial

114

Univ. of Crete Spring 2015

Map Reduce vs Parallel DBMS

Parallel DBMS MapReduce

Schema Support Not out of the box

Indexing Not out of the box

Programming Model Declarative (SQL)

Imperative (C/C++, Java, Y) Extensions through

Pig and Hive Optimizations

(Compression, Query Optimization)

Not out of the box

Flexibility Not out of the box

Fault Tolerance Coarse grained

techniques

[Pavlo et al., SIGMOD 2009, Stonebraker et al., CACM 2010, 2]

References

Related documents

Conclusion: Women in this study had inadequate knowledge and inappropriate practice related to mammography as a procedure for breast cancer investigation.. Pan African

The dynamic model of the grid consists of turbine governors (TG), automatic voltage regulators (AVR) as well as wind turbines, solar power units and energy storage units1.

Survey measures of preference parameters provide a means for accounting for otherwise unobserved heterogeneity.This paper presents measures of relative risk tolerance based on

character in the Game of Thrones series, Jon Snow’s entire role in the show serves as the source of his description, which begins in the first season when he is the bastard of

Hadoop MapReduce - the programming model (framework) of performance of distributed computing for the large amounts of data within the paradigm of map / reduce, which is a set of

•  A fast and general in-memory compute engine for

For several decades, many Inquisitors of the Ordo Hereticus with an interest in the Askellon Sector, and in particular its ancient sector capital, Juno, have been aware of an

Step 3: For each item pair compute its average difference from its difference list Step 4: Output the results into a final result file: D &lt;item1, item2, avg diff&gt;.. Mahout