Data Science Analytics &
Research Centre
Big Data
• Big Data Overview
• Characteristics
• Applications & Use Case
HDFS
• Hadoop Distributed File System (HDFS) Overview
• HDFS Architecture
• Data replication
• Node types
• Jobtracker / Tasktracker
• HDFS Data Flows
• HDFS Limitations
Hadoop
• Hadoop Overview
• Inputs & Outputs
• Data Types
• What is MapReduce (MR)
• Example
• Functionalities of MR
• Speculative Execution
• Hadoop Streaming
Big Data Overview
Characteristics
Applications & Use Case
Data Footprint & Time Horizon
Technology Adoption Lifecycle
Real Time
Near Real Time
Hourly Daily Weekly
Monthly Quarterly
Yearly 3 Years 5 Years 10 Years
Highly
Summarized
Aggregated
Detailed
Events / Facts
Unstructured
Web /
Telemetry
Real Time Daily Monthly Yearly
GB TB PB
Predictive Analytics Analytic
Marts & Cubes
Core ERP
& Legacy Applications
& Data Warehouse
Big Data Hadoop etc.
Visualization &
Dashboards
Consumption
Source
Financial Services
• Detect fraud
• Model and manage risk
• Improve debt recovery rates
• Personalize banking/insurance
products
Healthcare
• Optimal treatment pathways
• Remote patient monitoring
• Predictive modeling for new drugs
• Personalized medicine
Retail
• In-store behavior analysis
• Cross selling
• Optimize pricing, placement, design
• Optimize inventory and distribution
Web / Social / Mobile
• Location-based marketing
• Social segmentation
• Sentiment analysis
• Price comparison services
Manufacturing
• Design to value
• Crowd-sourcing
• “Digital factory” for lean manufacturing
• Improve service via product sensor data
Government
• Reduce fraud
• Segment populations, customize
action
• Support open data initiatives
• Automate decision making
Hadoop Distributed File System (HDFS)
Overview
HDFS Architecture
Data replication
Node types
Jobtracker / Tasktracker
HDFS Data Flows
HDFS Limitations
Hadoop own implementation of distributed file system.
Is coherent and provides all facilities of a file system.
Implements ACLs and provides a subset of usual UNIX
commands for accessing or querying the filesystem.
It has large block size (default 64MB) 128MB
recommended for storage to compensate for seek time to
network bandwidth. So very large files for storage are
ideal.
Streaming data access. Write once and read many times
architecture. Since files are large time to read is significant
parameter than seek to first record.
Commodity hardware. It is designed to run on commodity
hardware which may fail. HDFS is capable of handling it.
E.g.: 420MB file is split as:
128 MB 128 MB 128 MB 36 MB
Complete File 1
B1 B2 B3
Rack 1 Namenode n1
n2
n3
n4
Rack 2 n1
n2
n3
n4
Rack 3 n1
n2
n3
n4
B1
B1
B1
B2
B2
B2
B3 B3
B3
Create
• HDFS Flow – Read • HDFS Flow – Write
Command Usage Syntax
cat Copies source paths to stdout hadoop dfs -cat URI [URI …]
chgrp
Change group association of files. With -R, make the
change recursively through the directory structure. hadoop dfs -chgrp [-R] GROUP URI [URI …]
chmod
Change the permissions of files. With -R, make the change recursively through the directory structure
hadoop dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI …]
chown
Change the owner of files. With -R, make the change recursively through the directory structure
hadoop dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]
copyFromLocal
Similar to put command, except that the source is
restricted to a local file reference. hadoop dfs -copyFromLocal <localsrc> URI copyToLocal
Similar to get command, except that the destination is restricted to a local file reference.
hadoop dfs -copyToLocal [-ignorecrc] [-crc] URI
<localdst>
cp Copy files from source to destination hadoop dfs -cp URI [URI …] <dest>
du
Displays aggregate length of files contained in the
directory or the length of a file in case its just a file. hadoop dfs -du URI [URI …]
dus Displays a summary of file lengths. hadoop dfs -dus <args>
expunge Empty the Trash hadoop dfs -expunge
get Copy files to the local file system hadoop dfs -get [-ignorecrc] [-crc] <src> <localdst>
getmerge
Concatenates files in source into the destination
local file hadoop dfs -getmerge <src> <localdst> [addnl]
Command Usage Syntax
mkdir
Takes path uri's as argument and creates
directories hadoop dfs -mkdir <paths>
movefromLocal
dfs -moveFromLocal <src> <dst>
mv Moves files from source to destination hadoop dfs -mv URI [URI …] <dest>
put
Copy single src, or multiple srcs from local file
system to the destination filesystem hadoop dfs -put <localsrc> ... <dst>
rm (or) rmr
Delete files specified as args. Only deletes non
empty directory and files hadoop dfs -rm URI [URI …]
setrep
Changes the replication factor of a file. -R option is for recursively increasing the replication factor
of files within a directory hadoop dfs -setrep [-R] <path>
stat Returns the stat information on the path hadoop dfs -stat URI [URI …]
tail Displays last kilobyte of the file to stdout hadoop dfs -tail [-f] URI
test
e - if the file exists
z - if the file is zero length
d - if the path is directory hadoop dfs -test -[ezd] URI text
Takes a source file and outputs the file in text
format hadoop dfs -text <src>
Low latency data access. It is not optimized for low latency data access it
trades latency to increase the throughput of the data.
Lots of small files. Since block size is 64 MB and lots of small files(will
waste blocks) will increase the memory requirements of namenode.
Multiple writers and arbitrary modification. There is no support for
multiple writers in HDFS and files are written to by a single writer after
end of each file.
Hadoop Overview
Inputs & Outputs
Data Types
What is MR
Example
Functionalities of MR
Speculative Execution
How Hadoop runs MR
Hadoop Streaming
Hadoop Job Scheduling
Hadoop is a framework which provides open source libraries for distributed computing
using simple single map-reduce interface and its own distributed filesystem called HDFS. It
facilitates scalability and takes cares of detecting and handling failures.
• 1.0.X - current stable version, 1.0 release
• 1.1.X - current beta version, 1.1 release
• 2.X.X - current alpha version
• 0.23.X - similar to 2.X.X but missing NN HA.
• 0.22.X - does not include security
• 0.20.203.X - old legacy stable version
• 0.20.X - old legacy version
Risk Modeling:
How business/industry can better understand
customers and market.
• Customer Churn Analysis:
– Why companies really loose customers.
• Recommendation Engine:
– How to predict customer preferences.
AD Targeting:
How to increase campaign efficiency.
• Point of Sale Transaction Analysis:
– Targeting promotions to make customers buy.
• Predicting network Failure:
– Using machine-generated data to identify trouble spots.
Threat Analysis:
Detecting threats and fraudulent analysis.
• Trade Surveillance:
– Help business spot the rogue trader.
• Search Quality:
– Delivering more relevant search results to customers.
Framework is introduced by google.
Process vast amounts of data (multi-terabyte data-sets) in-parallel.
Achieves high performance on large clusters (thousands of nodes) of commodity
hardware in a reliable, fault-tolerant manner.
Splits the input data-set into independent chunks.
Sorts the outputs of the maps, which are then input to the reduce tasks.
Takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
The MapReduce framework operates exclusively on <key, value> pairs, that is, the
framework views the input to the job as a set of <key, value> pairs and produces a set of
<key, value> pairs as the output of the job, conceivably of different types.
The key and value classes have to be serializable by the framework and hence need to
implement the Writable interface. Additionally, the key classes have to implement the
WritableComparable interface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, List(v2)> -> reduce -> <k3, v3> (output)
• Serialization is the process of turning structured objects into a byte stream for transmission over
a network or for writing to persistent storage.
• Hadoop has writable interface supporting serialization
• There are following predefined implementations available for WritableComparable.
1. IntWritable
2. LongWritable
3. DoubleWritable
4. VLongWritable. Variable size, stores as much as needed. 1-9 bytes storage
5. VIntWritable. Less used ! as it is pretty much represented by Vlong.
6. BooleanWritable
7. FloatWritable
8. BytesWritable.
9. NullWritable.
10. MD5Hash
11. ObjectWritable
12. GenericWritable
Apart from the above there are four Writable Collection types
1. ArrayWritable
2. TwoDArrayWritable
3. MapWritable
4. SortedMapWritable
MapperClass
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken());
output.collect(word, one);
} }
ReducerClass
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter
reporter) throws IOException { int sum = 0;
while (values.hasNext()) { sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
} }
<K1, V1>
<K2, V2>
<K2, List(V2)>
<K3, V3>
Input Data
Input Data Format
Mapper
Reducer
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 Hello World Bye World
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 Hello Hadoop Goodbye Hadoop
Run the application:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
Mapper implementation:
Lines: 18 - 25 The first map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
Combiner implementation:
Line: 46 Output of first map emits:
< Bye, 1>
< Hello, 1>
< World, 2>
Output of second map emits:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
Reducer implementation:
Lines: 29 - 35
Output of job:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
•
A way of coping with individual Machine performance
•
The same input can be processed multiple times in parallel, to exploit differences in machine
capabilities
•
Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do
not have other work to perform
Name Value Description
mapred.map.tasks.
speculative.execution
true If true, then multiple instances of some map
tasks may be executed in parallel.
Mapred.reduce.tasks.
speculative.execution
true If true, then multiple instances of some reduce
tasks may be executed in parallel.
Utility that comes with the Hadoop distribution
Allows you to create and run map/reduce jobs with any executable or script as the mapper
and/or the reducer
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper\
-reducer /bin/wc \
-jobconf mapred.reduce.tasks=2
Default Scheduler
Single priority based queue of jobs.
Scheduling tries to balance map and reduce load on all tasktrackers in the cluster.
Capacity Scheduler
Within a queue, jobs with higher priority will have access to the queue's resources before jobs with
lower priority.
In order to prevent one or more users from monopolizing its resources, each queue enforces a limit on
the percentage of resources allocated to a user at any given time, if there is competition for them.
Fair Scheduler
Multiple queues (pools) of jobs – sorted in FIFO or by fairness limits
Each pool is guaranteed a minimum capacity and excess is shared by all jobs using a fairness algorithm.