Introduc)on to RHadoop
Master’s Degree in Informa1cs Engineering
Master’s Programme in ICT Innova1on: Data Science
(EIT ICT Labs Master School)
Contents
•
Introduc1on to…
•
MapReduce
•
HDFS
•
Hadoop
MapReduce & DQ
•
Divide and Conquer (DQ)
•
General idea
• Divide a problem into sub-‐problems (smaller)
• Solve each problem (independently)
DQ: pseudo-‐code
Func1on DQ (X: Problem data)
if small(X) then
S = easy(X)
if not
divide(X) => (X
1, ..., X
k)
for i = 1 to k do
S
i= DQ(X
i)
S = combine(S
1, ..., S
k)
return S
DQ: efficiency
•
Efficiency of this approach
•
An appropriate threshold must be selected to apply easy(X)
•
Decomposi1on and combining func1ons must be efficient
DQ: Remarks
•
It can not be applied to any type of problems
•
Some1mes, it might not be obvious how to divide a large problem
into sub-‐problems
•
If such division is uneven, we will have an unbalanced system, which would
have an import impact on the overall performance of the algorithm
•
The size of the reduced problems must be significantly smaller than the
original one so that massively parallel supercomputer could be used and the
communica1on overhead can be compensated
MapReduce: general scheme
MapReduce: more detail
MapReduce: example
Hadoop Distributed File System (HDFS)
•
Distributed File System evolved from Google implementa1on (GFS)
•
Fault-‐tolerant: files and divided in chunks and those are distributed
and replicated through the cluster
•
Normally, the replica1on ra1o is 3
•
There is a Master Node that stores this meta-‐data: which files, into
how many chunks these are divided and where they are stored
Hadoop Distributed File System (HDFS)
Hadoop Distributed File System (HDFS)
•
In HDFS, blocks should be read from the beginning to the end (this
favors the
MapReduce
approach)
•
Files in the HDFS system ARE NOT stored along with the host system
files
•
HDFS is normally an abstrac1on OVER an exis1ng file system (ext3, ext4, etc.)
•
Thus, there are specific commands to manipulate the HDFS file system
•
To open a file stored in HDFS, the client must contact the
NameNode
to retrieve the loca1on of each block of the file (at the
DataNodes
)
Hadoop Distributed File System (HDFS)
•
Data locality:
normally, when launching a job, it is run in the same
node that stores the data it must manipulate
•
The meta-‐data stored in the
NameNode
is not automa1cally
HDFS from the command line
•
Each user of the HDFS has a personal directory
•
No security direc1ves implemented, so users can write anywhere
•
Access to HDFS through the
hdfs
command
hdfs dfs
command
•
Important commands
•
-‐copyFromLocal vs. -‐copyToLocal
•
-‐mkdir
•
-‐cp, -‐mv
Hadoop MRv1 vs Yarn (MRv2)
•
Hadoop MRv1
•
Resources management and tasks scheduling and monitoring done by a single
process (bogle-‐neck):
Job Tracker
•
Each sub-‐problem is run by an independent process:
Task Tracker
•
Hadoop MRv2
•
Resources management and tasks scheduling and monitoring are split in
different processes
• Resource Manager (RM): overall resources management
• Applica>on Master(AM): per job tasks scheduling and monitoring
Example: wordcount
•
Input:
document made up of words
•
Output:
A set of (Word, count(Word))
•
Two func1ons:
map
and
reduce
•
map(k1, v1):
for each word w in v1
emit(w, 1)
•
reduce(k2, v2_list):
int result = 0;
for each v in v2_list
result += v;
emit(k2, result)
RHadoop
•
Developed by Revolu1on Analy1cs (acquired by Microsol)
•
Three main components
•
rhdfs: R + HDFS
•
rmr2: R + Map Reduce
•
rhbase: R + Hbase
•
Can be downloaded from:
hgps://github.com/Revolu1onAnaly1cs/RHadoop/wiki/Downloads
RHadoop: interac)ng with HDFS
# Load rhdfs library library(rhdfs) # Start rhdfs hdfs.init()# Basic "ls", path is mandatory hdfs.ls("/user/hadoop”) # Create directory work.dir <-‐ "/user/hadoop/aux/” hdfs.mkdir(work.dir) # And delete hdfs.delete(work.dir) # Create again hdfs.mkdir(work.dir)
RHadoop: wordcount example
•
Library loading and ini1aliza1on
# Loading the RHadoop libraries
library('rhdfs’)
library('rmr2')
# Ini1alizaing the RHadoop
hdfs.init()
RHadoop: wordcount example
wordcount = func1on(input,
# The output can be an HDFS path but # if it is NULL some temporary file will # be generated and wrapped in a big data # object, like the ones generated by to.dfs output = NULL,
pagern = " ") {
# Defining wordcount Map func1on wc.map = func1on(., lines) {
keyval( unlist(strsplit(x = lines, split = pagern)), 1) }
# Defining wordcount Reduce func1on wc.reduce = func1on(word, counts ) { keyval(word, sum(counts)) }
RHadoop: wordcount example
# Defining MapReduce parameters by calling mapreduce func1on
mapreduce(input = input ,
output = output,
# You can specify your own input and output formats
# and produce binary formats with the func1ons
# make.input.format and make.output.format
input.format = "text”,
map = wc.map,
reduce = wc.reduce,
# With combiner
combine = T)
}
RHadoop: wordcount example
# Running MapReduce Job by passing the Hadoop
# input directory loca1on as parameter
wordcount('/user/hadoop/wordcount/quijote.txt')
# Retrieving the RHadoop MapReduce output
# data by passing output
# directory loca1on as parameter
from.dfs("/tmp/file1b0817a5bcd0")
•
El Quijote can be downloaded from:
RHadoop: airline example
•
We will analyze the commercial data of an airline
•
The input data file is a CSV
•
We will need to use a custom input formager to ease the task of
processing the file
•
Data can be downloaded from:
hgp://stat-‐compu1ng.org/dataexpo/2009/1987.csv.bz2
RHadoop: airline example
library(rmr2)
library('rhdfs’)
hdfs.init()
# Put data in HDFS
hdfs.data.root = '/user/hadoop/rhadoop/airline’
hdfs.data = file.path(hdfs.data.root, 'data’)
hdfs.mkdir(hdfs.data)
hdfs.put("/home/hadoop/Downloads/1987.csv", hdfs.data)
RHadoop: airline example (input format)
#
# asa.csv.input.format() -‐ read CSV data files and label field names # for beger code readability (especially in the mapper)
#
asa.csv.input.format = make.input.format(format='csv', mode='text', streaming.format = NULL, sep=',', col.names = c('Year', 'Month', 'DayofMonth', 'DayOfWeek',
'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum',
'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'Cancella1onCode', 'Diverted', 'CarrierDelay', 'WeatherDelay',
'NASDelay', 'SecurityDelay', 'LateAircralDelay'), stringsAsFactors=F)
RHadoop: airline example (mapper 1/2)
#
# the mapper gets keys and values from the input formager
# in our case, the key is NULL and the value is a data.frame from read.table() #
mapper.year.market.enroute_1me = func1on(key, val.df) {
# Remove header lines, cancella1ons, and diversions:
val.df = subset(val.df, Year != 'Year' & Cancelled == 0 & Diverted == 0)
# We don't care about direc1on of travel, so construct a new 'market' vector # with airports ordered alphabe1cally (e.g, LAX to JFK becomes 'JFK-‐LAX')
RHadoop: airline example (mapper 2/2)
# key consists of year, market
output.key = data.frame(year=as.numeric(val.df$Year), market=market, stringsAsFactors=F)
# emit data.frame of gate-‐to-‐gate elapsed 1mes (CRS and actual) + 1me in air output.val = val.df[,c('CRSElapsedTime', 'ActualElapsedTime', 'AirTime')]
colnames(output.val) = c('scheduled', 'actual', 'inflight')
# and finally, make sure they're numeric while we're at it
output.val = transform(output.val, scheduled = as.numeric(scheduled),
actual = as.numeric(actual), inflight = as.numeric(inflight))
return( keyval(output.key, output.val) ) }
RHadoop: airline example (reducer)
#
# the reducer gets all the values for a given key
# the values (which may be mul1-‐valued as here) come in the form of a data.frame #
reducer.year.market.enroute_1me = func1on(key, val.df) {
output.key = key
output.val = data.frame(flights = nrow(val.df),
scheduled = mean(val.df$scheduled, na.rm=T), actual = mean(val.df$actual, na.rm=T),
inflight = mean(val.df$inflight, na.rm=T) )
return( keyval(output.key, output.val) ) }
RHadoop: final configura)on and execu)on
mr.year.market.enroute_1me = func1on (input, output) { mapreduce(input = input, output = output, input.format = asa.csv.input.format, map = mapper.year.market.enroute_1me, reduce = reducer.year.market.enroute_1me, backend.parameters = list(
hadoop = list(D = "mapred.reduce.tasks=2") ),
verbose=T) }
RHadoop: gathering results
results = from.dfs( out )
results.df = as.data.frame(results, stringsAsFactors=F )
colnames(results.df) = c('year', 'market', 'flights', 'scheduled', 'actual',
'inflight')
print(head(results.df))