Hadoop/MapReduce Workshop
guillimin@calculquebec.ca
August 14, 2015
Dan Mazur
Outline
● Hadoop introduction and motivation● Python review
● HDFS - The Hadoop Filesystem
● MapReduce examples and exercises
– Wordcount
– Distributed grep
– Distributed sort
– Maximum
Exercise 0: Login and Setup
● Log-in to Guillimin
– $ ssh -X class##@hadoop.hpc.mcgill.ca
– Use the account number and password from the slip you received
● Copy workshop files
– $ cp -R /software/workshop/hadoop .
● Load Hadoop Module
– $ module show hadoop
Exercise 1: First MapReduce job
● To make sure your environment is set up correctly ● Launch an example MapReduce job
– $ hadoop jar $HADOOP_EXAMPLES pi 100 100
– $HADOOP_EXAMPLES=/usr/hdp/2.2.0.0-2041/hadoop-mapred uce/hadoop-mapreduce-examples.jar
● Final output
– Job Finished in 16.983 seconds
– Estimated value of Pi is 3.14080000000000000000
Hadoop
Hadoop
● What is Hadoop?– A collection of related software (“software framework”, “software stack”, “software ecosystem”)
– Data-intensive computing (“Big Data”)
– Scales with data size
– Fault-tolerant
– Parallel
– Analysis of unstructured, semi-structured data
– Cluster
MapReduce
● What is MapReduce?
– Parallel, distributed programming model
– Large-scale data processing
– Map()
● Filter, sort, embarrassingly parallel ● e.g. sort participants by first name
– Reduce()
● Summary
Hadoop Ecosystem
● Apache Hadoop core
– HDFS - Hadoop Distributed File System
– Yarn - Resource management, job scheduling
● Pig - high-level scripting for MapReduce programs ● Hive - Data warehouse with SQL-like queries
● HCatalog - Abstracts data storage filenames and formats ● HBase - Database
● Zookeeper - Maintains and synchronizes configuration ● Oozie - Workflow scheduling
Hadoop Motivation
● For “big data”, hard disk input/output (I/O) is a bottleneck
● We have seen huge technology improvements in both CPU speeds and storage capacity
● I/O performance has not improved as dramatically
● We will see that Hadoop solves this problem by parallelizing I/O
operations across many disks
● The genius of Hadoop is in how easy this parallelization is from the
Hadoop Motivation
● Big Data Challenges - The “V”-words – Volume
● Amount of data (terabytes, petabytes)
● Want to distribute storage across many nodes and analyze it in place – Variety
● Data comes in many formats, not always in a relational database ● Want to store data in original format
– Velocity
● Rate at which size of data grows, or speed with which it must be processed ● Want to expand storage and analysis capacity as data grows
How Does Hadoop Solve These
Problems?
● Distributed file system (HDFS)
– Scalable with redundancy
● Parallel computations on data nodes
Hadoop vs. Competition
● Hadoop works well for
loosely coupled (embarrassingly) parallel problems Database Proc ess coup li ng MPI Hadoop
Hadoop vs. Competition
● Map or reduce tasks are
automatically re-run if they fail ● Data is stored redundantly and reproduced automatically if a drive fails Database Parallelism Faul t Tol eran ce MPI Hadoop
Hadoop vs. Competition
● Hadoop makes certain
problems very easy to solve
● Hardest parts of parallel
programming are abstracted away
● Today we will write several
practical codes that could scale to 1000s of nodes with just a few lines of code
Database Dev elop er P ro duct iv it y MPI Hadoop
Hadoop vs. and Competition
● Hadoop, MPI, and databases are all improving their weaknesses
– All are becoming fault-tolerant platforms for tightly-coupled, massively parallel problems
● Hadoop integrates easily into a workflow that includes MPI and/or databases
– Sqoop, HBase, etc. for working with databases – Hadoop for post MPI data analysis
– Hadoop for pre MPI data processing
– Hadoop 2 introduced a scheduler, Yarn, that can schedule MPI, MapReduce, and other types of workloads
● New tools for tightly-coupled problems – Apache Spark
Python
● Hadoop is implemented in Java
● Developers can work with any programming
language
● For a workshop, it is important to have a common
Python - for loops
● Can loop over individual instances of 'iterable'
objects (lists, tuples, dictionaries)
● Looped sections use an indented block
– Be consistent: use a tab or 4 spaces, not both
● Do not forget the colon
myList = ['one', 'two', 'three']
for item in myList:
Python standard input/output
#!/usr/bin/python
import
sys
import
csv
reader = csv.reader(sys.stdin, delimiter=',')
for
line
in
reader:
data0 = line[0]
Load modules for system
and comma separated value file functions
Dictionaries
● Unordered set of key/value pairs
– keys are unique, can be used as an index
>>> dict = {'key':'value', 'apple':'the round fruit of a tree'} >>> print dict['key']
value
>>> print dict['apple'] the round fruit of a tree
dict 'key' 'apple'
'value'
Hadoop Distributed File System
(HDFS)
● Key Concepts:
– Data is read and written in minimum units (“blocks”)
– A master node (“namenode”) manages the filesystem tree and the metadata for each file
– Data is stored on a group of worker nodes (“datanodes”)
● The same blocks are replicated across multiple datanodes (default replication = 3)
HDFS Blocks
2: 64 MB 3: 22 MB 1: 64 MB 150 MB datanode datanode datanode datanode datanode datanode myFile.txtHDFS Blocks
2: 64 MB 3: 22 MB 1: 64 MB 150 MB datanode datanode datanode datanode datanode datanode myFile.txt 1: 64 MB 2: 64 MB 3: 22 MBData is distributed block-by-block to multiple nodes
HDFS Blocks
2: 64 MB 3: 22 MB 1: 64 MB 150 MB datanode datanode datanode datanode datanode datanode myFile.txt 1: 64 MB 2: 64 MB 3: 22 MB 1: 64 MB 2: 64 MB 1: 64 MB 2: 64 MB 3: 22 MB 3: 22 MB Data redundancy default = 3xIf we lose a node, data is available on 2 other nodes and the namenode arranges to create a 3rd copy on another node
Exercise 2: Using HDFS
● Put a file into HDFS – $ hdfs dfs -put titanic.txt
● List files in HDFS – $ hdfs dfs -ls
● Output the file contents – $ hdfs dfs -cat titanic.txt
– $ hdfs dfs -tail titanic.txt
● Get help
– $ hdfs dfs -help
MapReduce
● Roman census approach:
– Want to count (and tax) all people in the Roman empire
– Better to go to where the people are (decentralized) than try to bring them to you (centralized)
– Bring back information from each village (map phase)
Roman Census: Mapping
Capital Village 287 men 293 women 104 children 854 sheep ... Village Village Village mapper mapper mapper mapperRoman Census: Reducing
854 sheep 34 sheep 1032 sheep 206 sheep 91 sheep 545 sheep 854 sheep 854 sheep 854 sheep 854 sheep 854 sheep 287 men 854 sheep 854 sheep 854 sheep 854 sheep 854 sheep 293 women 854 sheep 854 sheep 854 sheep 854 sheep 854 sheep 104 children 2762 sheep reducer reducer reducerMapReduce
Data mappers key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairs key, value pairssort and shuffle
key, all values key, all values key, all values key, all values
Mapper
● Takes specified data as input
● Works with a fraction of the data ● Works in parallel
● Outputs intermediate records
– key, value pairs
Reducer
● Takes a key or set of keys with all associated
values as input
● Works with all data for that key ● Outputs the final results
MapReduce Word Counting
● Want to count the frequency of words in a
document
● What are the key, value pairs for our mapper
output?
– A) key=1, value='word'
– B) key='word', value=1
– C) key=[number in hdfs block], value='word'
– D) key='word', value=[number in hdfs block]
MapReduce Word Counting
● Want to count the frequency of words in a
document
● What are the key, value pairs for our mapper
output?
– A) key=1, value='word'
– B) key='word', value=1
– C) key=[number in hdfs block], value='word'
MapReduce Word Counting
● If our mapper input is “hello world, hello!”, what
will our reducer input look like?
B)
hello 1 world 1 hello 1
A)
hello world hello
C) hello 1 hello 1 world 1 D) hello 2 world 1
MapReduce Word Counting
● If our mapper input is “hello world, hello!”, what
will our reducer input look like?
B)
hello 1 world 1 hello 1
A)
hello world hello
C)
hello 1
D)
hello 2 Explanation: The reducerreceives SORTED key, value pairs. The sorting
MapReduce Word Counting
● Want to count the frequency of words in a document ● What are the key, value pairs for our reducer output?
– A) key=1, value='word'
– B) key='word', value=1
– C) key=[count in document], value='word'
– D) key='word', value=[count in document]
MapReduce Word Counting
● Want to count the frequency of words in a document ● What are the key, value pairs for our reducer output?
– A) key=1, value='word'
– B) key='word', value=1
– C) key=[count in document], value='word'
– D) key='word', value=[count in document]
Hadoop Streaming
● Streaming lets developers use any programming language for
mapping and reducing
● Use standard input and standard output
● The first tab character delimits between key and value ● Similar to Bash pipes
– $ cat file.txt | ./mapper | sort | ./reducer
● HADOOP_STREAM=/usr/hdp/2.2.0.0-2041/hadoop-mapreduce/hadoop-streaming.jar
● hadoop jar $HADOOP_STREAM -files <mapper script>,<reducer script> -input <input dir> -output <output dir> -mapper <mapper script> -reducer <reducer script>
mapper.py
#!/usr/bin/env python import sys
for line in sys.stdin:
# split the line into words
words = line.split() for word in words:
print word, '\t', 1
Scripts require a “hash bang” line Import the sys module for stdin Loop over standard input
Loop over words
Reducer: Checking for key changes
● Often reducers will have to detect when the key
changes in the sorted mapper output
prevKey = None
for line in inputReader: key = line[0]
#If the current key is the same as previous key if key == prevKey:
value = ... # update for current line # Else we have started a new group of keys else:
# if not first line of input if not prevKey == None:
# Completed entire key, print value
print prevKey, “\t”, value
value = ... # set for first entry of new key # Set prevKey to the current key
prevKey = key
#Output final key, value pair if prevKey:
reducer.py
#!/usr/bin/env python import sys prevWord = None wordCount = 0 word = Nonefor line in sys.stdin:
word, count = line.split('\t', 1) count = int(count)
if word == prevWord: wordCount += count else:
if prevWord:
Testing map and reduce scripts
● It is useful to test your scripts with a small amount
of data in serial to check for syntax errors
Exercise 3: Word count
● Place the directory montgomery into HDFS
– $ hdfs dfs -put montgomery
● Submit a MapReduce job with your *tested*
scripts to count the word frequencies in Lucy Maud Montgomery's books
–
$ hadoop jar $HADOOP_STREAM \
-files mapper_wordcount.py,reducer_wordcount.py\ -mapper mapper_wordcount.py \
Exercise 3: Word count
● View the output directory
– $ hdfs dfs -ls wordCount
● View your results
– $hdfs dfs -cat wordCount/part-00000
● View your (sorted) results
Storage
A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup
(replication factor 3, 64MB blocks). Alice wants to upload two 400GB files and run WordCount on
them both. What will happen?
– A) The data upload fails at the first file
– B) The data upload fails at the second file
– C) MapReduce job fails
Storage
A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup
(replication factor 3, 64MB blocks). Alice wants to upload two 400GB files and run WordCount on
them both. What will happen?
– A) The data upload fails at the first file
– B) The data upload fails at the second file
– C) MapReduce job fails
Storage
A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup (replication factor 3,
64MB blocks). Alice wants to upload three 400GB files and run WordCount on them all. What will happen?
– A) The data upload fails at the first file
– B) The data upload fails at the second file
– C) The data upload fails at the third file
– D) MapReduce job fails
Storage
A Hadoop cluster has 10 nodes with 300GB of storage per node with the default HDFS setup (replication factor 3,
64MB blocks). Alice wants to upload three 400GB files and run WordCount on them all. What will happen?
– A) The data upload fails at the first file
– B) The data upload fails at the second file
– C) The data upload fails at the third file
– D) MapReduce job fails
– E) None of the above. MapReduce is successful.
Explanation: The small cluster can only store 1.0TB of data with 3X replication! Alice wants to upload 1.2TB.
Simplifying MapReduce Commands
● The native streaming commands are cumbersome ● TIP: Create simplifying aliases and functions
– e.g.
● Place these commands into ~/.bashrc so they are executed in each new bash session (each login)
mapreduce_stream(){
hadoop jar $HADOOP_STREAM -files $1,$2 \ -mapper $1 \
-reducer $2 \
-input $3 -output $4 }
Exercise 4: Hadoop Web UI
● Hadoop includes a web-based user interface ● Launch a firefox window
– $ firefox &
● Navigate to the Hadoop Job Monitor
– http://lm-2r01-n02:8088/cluster
● Navigate to the namenode and filesystem
– http://lm-2r01-n01:50070/dfshealth.jsp ● Navigate to the job history
Accessing Job Logs
● Through the web UI
● Through the command line
Example: Distributed Grep
Hadoop Command:
$ hadoop jar $HADOOP_STREAM \ -D mapreduce.job.reduces=0 \ -D mapred.reduce.tasks=0 \ -input titanic.txt \
-output grepout \
-mapper “/bin/grep Williams”
● Note: We don't have to write any scripts! ● Note: There is no reducer phase
Household Power Consumption
● Dataset: household_power_consumption.txt ● From the UCI Machine Learning Repository
● 9 Columns, semicolon separated (see household_power_consumption.explain)
– 1. date: dd/mm/yyyy
– 2. time: hh:mm:ss
– 3. minute-averaged active power (kilowatts)
– 4. minute-averaged reactive power (kilowatts)
– 5. minute-averaged voltage (volts)
– 6. minute-averaged current (amps)
– 7. kitchen active energy (watt-hours)
– 8. laundry active energy (watt-hours)
– 9. water-heater and A/C active energy (watt-hours)
Problematic Input
● In the household_power_consumption data,
missing values are specified by '?'
● Analysts must decide how to deal with unexpected
values in unstructured data
● Today, we will ignore it
for line in sys.stdin: try:
data = float(line.split(';')[2]) except:
On which date was the maximum
minute-averaged active power?
● What should the output of our mapper be?
A) power 1
B) date 1
C) date power
On which date was the maximum
minute-averaged active power?
● What should the output of our mapper be?
A) power 1
B) date 1
C) date power
Working with .csv files in python
● We can use the csv module in python to parse .csv
files more easily
import sys, csv
reader = csv.reader(sys.stdin, delimiter=';')
for line in reader:
data = float(line[2]) ...
Exercise 5: Compute the maximum
● Write a mapper and a reducer to compute the
maximum value of the minute-averaged active
power (3rd column), as well as the date on which this power occurred
Combiners
● To compute the max, you may have...
– Output a list of all values with a single common key
– Had a single reducer compute the maximum value in serial
● We would like to do some pre-reduction on the mapper nodes to balance the workload from the reducer to the mappers
● To find the maximum, we only need to send the maximum from each mapper through the network, not every value
Combiners (maximum)
16/12/2006 4.216 16/12/2006 5.360 16/12/2006 5.374 16/12/2006 5.388 16/12/2006 3.666 20/12/2006 1.516 20/12/2006 1.498 20/12/2006 1.518 20/12/2006 1.492 20/12/2006 1.504 ... 20/12/2006;02:46:00;1.516;0.262;245.780;6.200;0.000;1.000;18.000 20/12/2006;02:47:00;1.498;0.258;246.060;6.200;0.000;2.000;19.000 20/12/2006;02:48:00;1.518;0.264;246.240;6.200;0.000;1.000;18.000 ... 20/12/2006 1.516 20/12/2006 1.498 20/12/2006 1.518 20/12/2006 1.492 20/12/2006 1.504 16/12/2006 5.388 20/12/2006 1.518 Map CombineCombiners
● To compute the maximum the reducer and the
combiner can be the same script
– max() is associative and commutative
max([a,b]) = max([b,a])
Combiners
● To test your combiner scripts
● Hadoop sorts locally before the combiner and
globally between the combiner and reducer
● Note that Hadoop does not guarantee how many
times the combiner will be run
Combiners
● What is the benefit of reducing the number of
key-value pairs sent to the reducer?
A) The amount of work done in the Map phase is reduced B) The amount of work done in the Reduce phase is reduced C) The amount of data sent through the network is reduced D) More than one of the above
Combiners
● What is the benefit of reducing the number of
key-value pairs sent to the reducer?
A) The amount of work done in the Map phase is reduced B) The amount of work done in the Reduce phase is reduced C) The amount of data sent through the network is reduced D) More than one of the above
Histograms
Mwtoews, 2008
Exercise 6: Histogram
● Write a mapper that uses round(power) to 'bin' the minute-averaged active power
readings (3rd column)
– Output for each reading: [power bin], 1
● Write a reducer that creates combined counts
– Input: [power bin], count – Output: [power bin], count
– This script must also function as combiner
● Submit your tested scripts as a Hadoop job
– Use the reducer script as a combiner
– $ hadoop jar $HADOOP_STREAM ... -combiner reducer_hist.py ...
Mean and Standard Deviation
● We can't easily use combiners to compute the mean – max(max(a,b), max(c,d,e)) = max(a,b,c,d,e)
– mean(mean(a,b), mean(c,d,e)) != mean(a,b,c,d,e)
● Reducer function can be used as a combiner if – associative: (A*B)*C = A*(B*C)
– commutative: A*B = B*A
– e.g.: counting, addition, multiplication, ...
● Computing the mean and standard deviation means the reducer is stuck with a lot of math
Power consumption by day of week
● Are there days of the week when more power is
consumed, on average?
● Want to know the mean and standard deviation for
each week day
● Simplification: Compute average of
Python datetime
● The datetime module is a powerful library for
working with dates and times
● We can easily find the day of the week from the
date
– from datetime import datetime
– weekday = datetime.strptime(date,
Exercise 7: Mean and Standard
Deviation
● Write mapper and reducer code to compute the
mean and standard deviation for active power (3rd column) for each of the seven days of the week
● Test your scripts using serial Bash commands ● Submit your job to Hadoop
● Tip: Wikipedia - Algorithms for calculating
variance
– Python code to compute mean and variance in a single
Speedup - Mean and St.Dev.
● ~2 million entries ● Serial Bash version
– $ cat household_power_consumption.txt | ./mapper.py | sort | ./reducer.py
– 75 seconds
● Hadoop version
– 2 mappers, 1 reducer: 50 seconds
● Speedup: 1.5X
– 2 mappers, 2 reducers: 48 seconds
● Speedup: 1.6X
● -D mapred.reduce.tasks=2
Choosing numbers of maps/reduces
● Mappers
– More mappers increases parallelism
– Too many mappers increases scheduling overhead
– Hadoop automatically sets the number of mappers according to the block size and input data size
● Reducers
– Too few reducers increases the computational load on each reducer
– Too many reducers increases shuffle and HDFS overhead
Iterative MapReducing
● Many tasks in scientific computing cannot be easily
expressed as a single MapReduce job
● Often, we require iterating over data ● K-means clustering is an example
– We will see how it can be implemented in MapReduce
– We will not implement it, just see how it works with the MapReduce framework
K-means clustering
● Unsupervised machine learning
● Divide a data set into k different categories based on the features of
that data set
● Computational hotspot: computing distances between each cluster
centroid and each data point, O(n*k)
● E.g. Clothing manufacturer: based on customer's height and weight
data, divide them into 3 or more size categories
● E.g. Categorize astronomical objects into stars, galaxies, quasars, etc.
based on spectral data
● E.g. Categorize gene expression profiles to study function within
K-means clustering
Step 1: Randomly generate K locations (circles)
Step 2: Group data points by proximity to locations
MapReduce K-Means
Mapper
● Calculate centroid distances ● Assign data points to nearest centroid
Data points Centroid locations
Reducer
● Compute new centroids Converged?
No Mapper
Mapper
Final Centroid locations
Yes Mapper
key: best centroid value: data point
Reducer
key: old centroid value: new centroid
Iterative MapReducing
● To make iterative jobs easier, the Hadoop ecosystem has tools for iterative workloads
– Twister - Iterative MapReduce framework
– HaLoop - Iterative MapReduce framework
– Mahout - Scalable implementations of machine learning algorithms on Hadoop (including k-means)
– Spark - Framework for in-memory distributed computing, in-memory data sharing between jobs
● Make use of high-level interfaces to MapReduce for more complex jobs
In the time remaining...
● Import your own data into HDFS for analysis
– Your quota is 300GB (after replication) by default ● Examine some data from Twitter
– /software/workshop/twitter_data
– 3.8 million tweets + metadata ~ 11 GB
● Continue to work with the workshop data sets
– titanic.txt
– household_power_consumption.txt
– usask_access_log
● Contact us to add your user account to the Hadoop test environment (class accounts deactivate later today)
Keep Learning...
● Contact us for access to our Hadoop test system
– guillimin@calculquebec.ca
● Download a Hadoop virtual machine
– http://hortonworks.com/products/hortonworks-sandbox ● View online training materials
– https://www.udacity.com/course/ud617
– http://cloudera.com/content/cloudera/en/training/library.html ● Calcul Quebec workshop on Apache Spark (French)
Bonus Exercise: Top 10 websites
● Produce a top 10 list of websites accessed on the
University of Saskatchewan website
– usask_access_logs
● Be careful
– Some lines wont conform to your expectations
– How to handle?
● skip?
Top 10 List
● Mapper
– output: key 1
● Combiner
– returns top 10 for each mapper
– output: key count
● Reducer
– finds the global top 10