• No results found

Hadoop/MapReduce Workshop

N/A
N/A
Protected

Academic year: 2021

Share "Hadoop/MapReduce Workshop"

Copied!
83
0
0

Loading.... (view fulltext now)

Full text

(1)

Hadoop/MapReduce Workshop

guillimin@calculquebec.ca

August 14, 2015

Dan Mazur

(2)

Outline

● Hadoop introduction and motivation

● Python review

● HDFS - The Hadoop Filesystem

● MapReduce examples and exercises

– Wordcount

– Distributed grep

– Distributed sort

– Maximum

(3)

Exercise 0: Login and Setup

● Log-in to Guillimin

– $ ssh -X class##@hadoop.hpc.mcgill.ca

– Use the account number and password from the slip you received

● Copy workshop files

– $ cp -R /software/workshop/hadoop .

● Load Hadoop Module

– $ module show hadoop

(4)

Exercise 1: First MapReduce job

● To make sure your environment is set up correctly ● Launch an example MapReduce job

– $ hadoop jar $HADOOP_EXAMPLES pi 100 100

– $HADOOP_EXAMPLES=/usr/hdp/2.2.0.0-2041/hadoop-mapred uce/hadoop-mapreduce-examples.jar

● Final output

– Job Finished in 16.983 seconds

– Estimated value of Pi is 3.14080000000000000000

(5)

Hadoop

(6)

Hadoop

● What is Hadoop?

– A collection of related software (“software framework”, “software stack”, “software ecosystem”)

– Data-intensive computing (“Big Data”)

– Scales with data size

– Fault-tolerant

– Parallel

– Analysis of unstructured, semi-structured data

– Cluster

(7)

MapReduce

● What is MapReduce?

– Parallel, distributed programming model

– Large-scale data processing

– Map()

● Filter, sort, embarrassingly parallel ● e.g. sort participants by first name

– Reduce()

● Summary

(8)

Hadoop Ecosystem

● Apache Hadoop core

– HDFS - Hadoop Distributed File System

– Yarn - Resource management, job scheduling

● Pig - high-level scripting for MapReduce programs ● Hive - Data warehouse with SQL-like queries

● HCatalog - Abstracts data storage filenames and formats ● HBase - Database

● Zookeeper - Maintains and synchronizes configuration ● Oozie - Workflow scheduling

(9)

Hadoop Motivation

● For “big data”, hard disk input/output (I/O) is a bottleneck

● We have seen huge technology improvements in both CPU speeds and storage capacity

● I/O performance has not improved as dramatically

● We will see that Hadoop solves this problem by parallelizing I/O

operations across many disks

● The genius of Hadoop is in how easy this parallelization is from the

(10)

Hadoop Motivation

● Big Data Challenges - The “V”-words – Volume

● Amount of data (terabytes, petabytes)

● Want to distribute storage across many nodes and analyze it in place – Variety

● Data comes in many formats, not always in a relational database ● Want to store data in original format

– Velocity

● Rate at which size of data grows, or speed with which it must be processed ● Want to expand storage and analysis capacity as data grows

(11)

How Does Hadoop Solve These

Problems?

● Distributed file system (HDFS)

– Scalable with redundancy

● Parallel computations on data nodes

(12)

Hadoop vs. Competition

● Hadoop works well for

loosely coupled (embarrassingly) parallel problems Database Proc ess coup li ng MPI Hadoop

(13)

Hadoop vs. Competition

● Map or reduce tasks are

automatically re-run if they fail ● Data is stored redundantly and reproduced automatically if a drive fails Database Parallelism Faul t Tol eran ce MPI Hadoop

(14)

Hadoop vs. Competition

● Hadoop makes certain

problems very easy to solve

● Hardest parts of parallel

programming are abstracted away

● Today we will write several

practical codes that could scale to 1000s of nodes with just a few lines of code

Database Dev elop er P ro duct iv it y MPI Hadoop

(15)

Hadoop vs. and Competition

● Hadoop, MPI, and databases are all improving their weaknesses

– All are becoming fault-tolerant platforms for tightly-coupled, massively parallel problems

● Hadoop integrates easily into a workflow that includes MPI and/or databases

– Sqoop, HBase, etc. for working with databases – Hadoop for post MPI data analysis

– Hadoop for pre MPI data processing

– Hadoop 2 introduced a scheduler, Yarn, that can schedule MPI, MapReduce, and other types of workloads

● New tools for tightly-coupled problems – Apache Spark

(16)

Python

● Hadoop is implemented in Java

● Developers can work with any programming

language

● For a workshop, it is important to have a common

(17)

Python - for loops

● Can loop over individual instances of 'iterable'

objects (lists, tuples, dictionaries)

● Looped sections use an indented block

– Be consistent: use a tab or 4 spaces, not both

● Do not forget the colon

myList = ['one', 'two', 'three']

for item in myList:

(18)

Python standard input/output

#!/usr/bin/python

import

sys

import

csv

reader = csv.reader(sys.stdin, delimiter=',')

for

line

in

reader:

data0 = line[0]

Load modules for system

and comma separated value file functions

(19)

Dictionaries

● Unordered set of key/value pairs

– keys are unique, can be used as an index

>>> dict = {'key':'value', 'apple':'the round fruit of a tree'} >>> print dict['key']

value

>>> print dict['apple'] the round fruit of a tree

dict 'key' 'apple'

'value'

(20)

Hadoop Distributed File System

(HDFS)

● Key Concepts:

– Data is read and written in minimum units (“blocks”)

– A master node (“namenode”) manages the filesystem tree and the metadata for each file

– Data is stored on a group of worker nodes (“datanodes”)

● The same blocks are replicated across multiple datanodes (default replication = 3)

(21)

HDFS Blocks

2: 64 MB 3: 22 MB 1: 64 MB 150 MB datanode datanode datanode datanode datanode datanode myFile.txt

(22)

HDFS Blocks

2: 64 MB 3: 22 MB 1: 64 MB 150 MB datanode datanode datanode datanode datanode datanode myFile.txt 1: 64 MB 2: 64 MB 3: 22 MB

Data is distributed block-by-block to multiple nodes

(23)

HDFS Blocks

2: 64 MB 3: 22 MB 1: 64 MB 150 MB datanode datanode datanode datanode datanode datanode myFile.txt 1: 64 MB 2: 64 MB 3: 22 MB 1: 64 MB 2: 64 MB 1: 64 MB 2: 64 MB 3: 22 MB 3: 22 MB Data redundancy default = 3x

If we lose a node, data is available on 2 other nodes and the namenode arranges to create a 3rd copy on another node

(24)

Exercise 2: Using HDFS

● Put a file into HDFS – $ hdfs dfs -put titanic.txt

● List files in HDFS – $ hdfs dfs -ls

● Output the file contents – $ hdfs dfs -cat titanic.txt

– $ hdfs dfs -tail titanic.txt

● Get help

– $ hdfs dfs -help

(25)

MapReduce

● Roman census approach:

– Want to count (and tax) all people in the Roman empire

– Better to go to where the people are (decentralized) than try to bring them to you (centralized)

– Bring back information from each village (map phase)

(26)

Roman Census: Mapping

Capital Village 287 men 293 women 104 children 854 sheep ... Village Village Village mapper mapper mapper mapper

(27)

Roman Census: Reducing

854 sheep 34 sheep 1032 sheep 206 sheep 91 sheep 545 sheep 854 sheep 854 sheep 854 sheep 854 sheep 854 sheep 287 men 854 sheep 854 sheep 854 sheep 854 sheep 854 sheep 293 women 854 sheep 854 sheep 854 sheep 854 sheep 854 sheep 104 children 2762 sheep reducer reducer reducer

(28)

MapReduce

Data mappers key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs

sort and shuffle

key, all values key, all values key, all values key, all values

(29)

Mapper

● Takes specified data as input

● Works with a fraction of the data ● Works in parallel

● Outputs intermediate records

– key, value pairs

(30)

Reducer

● Takes a key or set of keys with all associated

values as input

● Works with all data for that key ● Outputs the final results

(31)

MapReduce Word Counting

● Want to count the frequency of words in a

document

● What are the key, value pairs for our mapper

output?

– A) key=1, value='word'

– B) key='word', value=1

– C) key=[number in hdfs block], value='word'

– D) key='word', value=[number in hdfs block]

(32)

MapReduce Word Counting

● Want to count the frequency of words in a

document

● What are the key, value pairs for our mapper

output?

– A) key=1, value='word'

– B) key='word', value=1

– C) key=[number in hdfs block], value='word'

(33)

MapReduce Word Counting

● If our mapper input is “hello world, hello!”, what

will our reducer input look like?

B)

hello 1 world 1 hello 1

A)

hello world hello

C) hello 1 hello 1 world 1 D) hello 2 world 1

(34)

MapReduce Word Counting

● If our mapper input is “hello world, hello!”, what

will our reducer input look like?

B)

hello 1 world 1 hello 1

A)

hello world hello

C)

hello 1

D)

hello 2 Explanation: The reducerreceives SORTED key, value pairs. The sorting

(35)

MapReduce Word Counting

● Want to count the frequency of words in a document ● What are the key, value pairs for our reducer output?

– A) key=1, value='word'

– B) key='word', value=1

– C) key=[count in document], value='word'

– D) key='word', value=[count in document]

(36)

MapReduce Word Counting

● Want to count the frequency of words in a document ● What are the key, value pairs for our reducer output?

– A) key=1, value='word'

– B) key='word', value=1

– C) key=[count in document], value='word'

– D) key='word', value=[count in document]

(37)

Hadoop Streaming

● Streaming lets developers use any programming language for

mapping and reducing

● Use standard input and standard output

● The first tab character delimits between key and value ● Similar to Bash pipes

– $ cat file.txt | ./mapper | sort | ./reducer

● HADOOP_STREAM=/usr/hdp/2.2.0.0-2041/hadoop-mapreduce/hadoop-streaming.jar

● hadoop jar $HADOOP_STREAM -files <mapper script>,<reducer script> -input <input dir> -output <output dir> -mapper <mapper script> -reducer <reducer script>

(38)

mapper.py

#!/usr/bin/env python import sys

for line in sys.stdin:

# split the line into words

words = line.split() for word in words:

print word, '\t', 1

Scripts require a “hash bang” line Import the sys module for stdin Loop over standard input

Loop over words

(39)

Reducer: Checking for key changes

● Often reducers will have to detect when the key

changes in the sorted mapper output

prevKey = None

for line in inputReader: key = line[0]

#If the current key is the same as previous key if key == prevKey:

value = ... # update for current line # Else we have started a new group of keys else:

# if not first line of input if not prevKey == None:

# Completed entire key, print value

print prevKey, “\t”, value

value = ... # set for first entry of new key # Set prevKey to the current key

prevKey = key

#Output final key, value pair if prevKey:

(40)

reducer.py

#!/usr/bin/env python import sys prevWord = None wordCount = 0 word = None

for line in sys.stdin:

word, count = line.split('\t', 1) count = int(count)

if word == prevWord: wordCount += count else:

if prevWord:

(41)

Testing map and reduce scripts

● It is useful to test your scripts with a small amount

of data in serial to check for syntax errors

(42)

Exercise 3: Word count

● Place the directory montgomery into HDFS

– $ hdfs dfs -put montgomery

● Submit a MapReduce job with your *tested*

scripts to count the word frequencies in Lucy Maud Montgomery's books

$ hadoop jar $HADOOP_STREAM \

-files mapper_wordcount.py,reducer_wordcount.py\ -mapper mapper_wordcount.py \

(43)

Exercise 3: Word count

● View the output directory

– $ hdfs dfs -ls wordCount

● View your results

– $hdfs dfs -cat wordCount/part-00000

● View your (sorted) results

(44)

Storage

A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup

(replication factor 3, 64MB blocks). Alice wants to upload two 400GB files and run WordCount on

them both. What will happen?

– A) The data upload fails at the first file

– B) The data upload fails at the second file

– C) MapReduce job fails

(45)

Storage

A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup

(replication factor 3, 64MB blocks). Alice wants to upload two 400GB files and run WordCount on

them both. What will happen?

– A) The data upload fails at the first file

– B) The data upload fails at the second file

– C) MapReduce job fails

(46)

Storage

A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup (replication factor 3,

64MB blocks). Alice wants to upload three 400GB files and run WordCount on them all. What will happen?

– A) The data upload fails at the first file

– B) The data upload fails at the second file

– C) The data upload fails at the third file

– D) MapReduce job fails

(47)

Storage

A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup (replication factor 3,

64MB blocks). Alice wants to upload three 400GB files and run WordCount on them all. What will happen?

– A) The data upload fails at the first file

– B) The data upload fails at the second file

– C) The data upload fails at the third file

– D) MapReduce job fails

– E) None of the above. MapReduce is successful.

Explanation: The small cluster can only store 1.0TB of data with 3X replication! Alice wants to upload 1.2TB.

(48)

Simplifying MapReduce Commands

● The native streaming commands are cumbersome ● TIP: Create simplifying aliases and functions

– e.g.

● Place these commands into ~/.bashrc so they are executed in each new bash session (each login)

mapreduce_stream(){

hadoop jar $HADOOP_STREAM -files $1,$2 \ -mapper $1 \

-reducer $2 \

-input $3 -output $4 }

(49)

Exercise 4: Hadoop Web UI

● Hadoop includes a web-based user interface ● Launch a firefox window

– $ firefox &

● Navigate to the Hadoop Job Monitor

– http://lm-2r01-n02:8088/cluster

● Navigate to the namenode and filesystem

– http://lm-2r01-n01:50070/dfshealth.jsp ● Navigate to the job history

(50)
(51)

Accessing Job Logs

● Through the web UI

● Through the command line

(52)

Example: Distributed Grep

Hadoop Command:

$ hadoop jar $HADOOP_STREAM \ -D mapreduce.job.reduces=0 \ -D mapred.reduce.tasks=0 \ -input titanic.txt \

-output grepout \

-mapper “/bin/grep Williams”

● Note: We don't have to write any scripts! ● Note: There is no reducer phase

(53)

Household Power Consumption

● Dataset: household_power_consumption.txt ● From the UCI Machine Learning Repository

● 9 Columns, semicolon separated (see household_power_consumption.explain)

– 1. date: dd/mm/yyyy

– 2. time: hh:mm:ss

– 3. minute-averaged active power (kilowatts)

– 4. minute-averaged reactive power (kilowatts)

– 5. minute-averaged voltage (volts)

– 6. minute-averaged current (amps)

– 7. kitchen active energy (watt-hours)

– 8. laundry active energy (watt-hours)

– 9. water-heater and A/C active energy (watt-hours)

(54)

Problematic Input

● In the household_power_consumption data,

missing values are specified by '?'

● Analysts must decide how to deal with unexpected

values in unstructured data

● Today, we will ignore it

for line in sys.stdin: try:

data = float(line.split(';')[2]) except:

(55)

On which date was the maximum

minute-averaged active power?

● What should the output of our mapper be?

A) power 1

B) date 1

C) date power

(56)

On which date was the maximum

minute-averaged active power?

● What should the output of our mapper be?

A) power 1

B) date 1

C) date power

(57)

Working with .csv files in python

● We can use the csv module in python to parse .csv

files more easily

import sys, csv

reader = csv.reader(sys.stdin, delimiter=';')

for line in reader:

data = float(line[2]) ...

(58)

Exercise 5: Compute the maximum

● Write a mapper and a reducer to compute the

maximum value of the minute-averaged active

power (3rd column), as well as the date on which this power occurred

(59)

Combiners

● To compute the max, you may have...

– Output a list of all values with a single common key

– Had a single reducer compute the maximum value in serial

● We would like to do some pre-reduction on the mapper nodes to balance the workload from the reducer to the mappers

● To find the maximum, we only need to send the maximum from each mapper through the network, not every value

(60)

Combiners (maximum)

16/12/2006 4.216 16/12/2006 5.360 16/12/2006 5.374 16/12/2006 5.388 16/12/2006 3.666 20/12/2006 1.516 20/12/2006 1.498 20/12/2006 1.518 20/12/2006 1.492 20/12/2006 1.504 ... 20/12/2006;02:46:00;1.516;0.262;245.780;6.200;0.000;1.000;18.000 20/12/2006;02:47:00;1.498;0.258;246.060;6.200;0.000;2.000;19.000 20/12/2006;02:48:00;1.518;0.264;246.240;6.200;0.000;1.000;18.000 ... 20/12/2006 1.516 20/12/2006 1.498 20/12/2006 1.518 20/12/2006 1.492 20/12/2006 1.504 16/12/2006 5.388 20/12/2006 1.518 Map Combine

(61)

Combiners

● To compute the maximum the reducer and the

combiner can be the same script

– max() is associative and commutative

max([a,b]) = max([b,a])

(62)

Combiners

● To test your combiner scripts

● Hadoop sorts locally before the combiner and

globally between the combiner and reducer

● Note that Hadoop does not guarantee how many

times the combiner will be run

(63)

Combiners

● What is the benefit of reducing the number of

key-value pairs sent to the reducer?

A) The amount of work done in the Map phase is reduced B) The amount of work done in the Reduce phase is reduced C) The amount of data sent through the network is reduced D) More than one of the above

(64)

Combiners

● What is the benefit of reducing the number of

key-value pairs sent to the reducer?

A) The amount of work done in the Map phase is reduced B) The amount of work done in the Reduce phase is reduced C) The amount of data sent through the network is reduced D) More than one of the above

(65)

Histograms

Mwtoews, 2008

(66)

Exercise 6: Histogram

● Write a mapper that uses round(power) to 'bin' the minute-averaged active power

readings (3rd column)

– Output for each reading: [power bin], 1

● Write a reducer that creates combined counts

– Input: [power bin], count – Output: [power bin], count

– This script must also function as combiner

● Submit your tested scripts as a Hadoop job

– Use the reducer script as a combiner

– $ hadoop jar $HADOOP_STREAM ... -combiner reducer_hist.py ...

(67)
(68)

Mean and Standard Deviation

● We can't easily use combiners to compute the mean – max(max(a,b), max(c,d,e)) = max(a,b,c,d,e)

– mean(mean(a,b), mean(c,d,e)) != mean(a,b,c,d,e)

● Reducer function can be used as a combiner if – associative: (A*B)*C = A*(B*C)

– commutative: A*B = B*A

– e.g.: counting, addition, multiplication, ...

● Computing the mean and standard deviation means the reducer is stuck with a lot of math

(69)

Power consumption by day of week

● Are there days of the week when more power is

consumed, on average?

● Want to know the mean and standard deviation for

each week day

● Simplification: Compute average of

(70)

Python datetime

● The datetime module is a powerful library for

working with dates and times

● We can easily find the day of the week from the

date

– from datetime import datetime

– weekday = datetime.strptime(date,

(71)

Exercise 7: Mean and Standard

Deviation

● Write mapper and reducer code to compute the

mean and standard deviation for active power (3rd column) for each of the seven days of the week

● Test your scripts using serial Bash commands ● Submit your job to Hadoop

● Tip: Wikipedia - Algorithms for calculating

variance

– Python code to compute mean and variance in a single

(72)

Speedup - Mean and St.Dev.

● ~2 million entries ● Serial Bash version

– $ cat household_power_consumption.txt | ./mapper.py | sort | ./reducer.py

– 75 seconds

● Hadoop version

– 2 mappers, 1 reducer: 50 seconds

● Speedup: 1.5X

– 2 mappers, 2 reducers: 48 seconds

● Speedup: 1.6X

● -D mapred.reduce.tasks=2

(73)

Choosing numbers of maps/reduces

● Mappers

– More mappers increases parallelism

– Too many mappers increases scheduling overhead

– Hadoop automatically sets the number of mappers according to the block size and input data size

● Reducers

– Too few reducers increases the computational load on each reducer

– Too many reducers increases shuffle and HDFS overhead

(74)

Iterative MapReducing

● Many tasks in scientific computing cannot be easily

expressed as a single MapReduce job

● Often, we require iterating over data ● K-means clustering is an example

– We will see how it can be implemented in MapReduce

– We will not implement it, just see how it works with the MapReduce framework

(75)

K-means clustering

● Unsupervised machine learning

● Divide a data set into k different categories based on the features of

that data set

● Computational hotspot: computing distances between each cluster

centroid and each data point, O(n*k)

● E.g. Clothing manufacturer: based on customer's height and weight

data, divide them into 3 or more size categories

● E.g. Categorize astronomical objects into stars, galaxies, quasars, etc.

based on spectral data

● E.g. Categorize gene expression profiles to study function within

(76)

K-means clustering

Step 1: Randomly generate K locations (circles)

Step 2: Group data points by proximity to locations

(77)

MapReduce K-Means

Mapper

● Calculate centroid distances ● Assign data points to nearest centroid

Data points Centroid locations

Reducer

● Compute new centroids Converged?

No Mapper

Mapper

Final Centroid locations

Yes Mapper

key: best centroid value: data point

Reducer

key: old centroid value: new centroid

(78)

Iterative MapReducing

● To make iterative jobs easier, the Hadoop ecosystem has tools for iterative workloads

– Twister - Iterative MapReduce framework

– HaLoop - Iterative MapReduce framework

– Mahout - Scalable implementations of machine learning algorithms on Hadoop (including k-means)

– Spark - Framework for in-memory distributed computing, in-memory data sharing between jobs

● Make use of high-level interfaces to MapReduce for more complex jobs

(79)
(80)

In the time remaining...

● Import your own data into HDFS for analysis

– Your quota is 300GB (after replication) by default ● Examine some data from Twitter

– /software/workshop/twitter_data

– 3.8 million tweets + metadata ~ 11 GB

● Continue to work with the workshop data sets

– titanic.txt

– household_power_consumption.txt

– usask_access_log

● Contact us to add your user account to the Hadoop test environment (class accounts deactivate later today)

(81)

Keep Learning...

● Contact us for access to our Hadoop test system

– guillimin@calculquebec.ca

● Download a Hadoop virtual machine

– http://hortonworks.com/products/hortonworks-sandbox ● View online training materials

– https://www.udacity.com/course/ud617

– http://cloudera.com/content/cloudera/en/training/library.html ● Calcul Quebec workshop on Apache Spark (French)

(82)

Bonus Exercise: Top 10 websites

● Produce a top 10 list of websites accessed on the

University of Saskatchewan website

– usask_access_logs

● Be careful

– Some lines wont conform to your expectations

– How to handle?

● skip?

(83)

Top 10 List

● Mapper

– output: key 1

● Combiner

– returns top 10 for each mapper

– output: key count

● Reducer

– finds the global top 10

References

Related documents

Bu çalışmada bulanık mantık tabanlı görüntü eşleştirme algoritması önerilmiştir.. Yapılan çalışma bulanık mantığın fotogrametrik amaçlı bir

For each individual whose compensation must be reported in Schedule J, report compensation from the organization on row ( i) and from related organizations , described in

As Shibata Rubber pond, “ NON-EVAPO SYSTEM”, is installed at a water reservoir of 1 hectare, it prevents permeance loss to the ground, restrains transpiration loss to 1/5 and

This research focuses on clustering of genes with similar expres- sion patterns using Hidden Markov Models (HMM) for time course data because they are able to model the

counseling was feasible to implement in outpatient commu- nity-based substance abuse treatment settings, was effective in producing modest abstinence rates and strong reductions

Charleston Community Hospital.[36] There, the Supreme Court of Illinois held that &#34;the jury could have found a hospital negligent, inter alia, in failing to have a sufficient

In the following section, information on the VDR variants found in the populations of South Africa was collated, with the aim of determining the function of such polymorphisms

Warehouse Architecture Storage (HDFS) Compute (MapReduce) Hadoop Query (Hive) Workflow (Nocron).!.