CIS 455/555: Internet and Web Systems
Hadoop
October 25, 2021
Plan for today
n Google File System
n Introduction to MapReduce
n
Programming model
n
Data flow
n
Example tasks
n Hadoop and HDFS
n
Architecture
n
Using Hadoop
n
Using HDFS
NEXT
Input: (k,v) where k is __________ and v is __________
map(key : __________, value : __________) {
}
reduce(key : __________, values: __________) {
}
Output: (k,v) where k is __________ and v is _________
Beyond word count
n
Distributed grep – all lines matching a pattern
n
Map: filter by pattern
n
Reduce: output set
n
Count URL access frequency
n
Map: output each URL as key, with count 1
n
Reduce: sum the counts
n
Reverse web-link graph
n
Map: output (target,source) pairs when link to target found in souce
n
Reduce: concatenates values and emits (target,list(source))
Inverted index
General Rules of Thumb
n Mapper and reducer should be stateless
n
Don't use static variables - after map + reduce return, they should remember nothing about the processed data!
n
Why? No guarantees about which key-value pairs will be processed by which workers!
n Try not to do your own I/O!
n
Except probing against static tables
n
The MapReduce framework does all the I/O for you:
n
All the incoming data will be fed as arguments to map and reduce Any data your functions produce should be output via emit
HashMap h = new HashMap();
map(key, value) {
if (h.contains(key)) { h.add(key,value);
emit(key, "X");
}
}
Dangerous!
map(key, value) { File foo =
new File("xyz.txt");
while (true) {
s = foo.readLine();
...
}
}
Dangerous!
Common mistakes to avoid
n Mapper must not map too much data to the same key
n
In particular, don't map everything to the same key!!
n
Otherwise the reduce worker will be overwhelmed!
n
It's okay if some reduce workers have more work than others
n
Example: In WordCount, the reduce worker that works on the key 'and' has a lot more work than the reduce worker that works on 'syzygy'.
map(key, value) {
emit("FOO", key + " " + value);
}
reduce(key, value[]) {
/* do some computation on all the values */
}
Wrong!
Designing MapReduce algorithms
n Key decision: What should be done by map, and what by reduce?
n
map can do something to each individual key-value pair, but it can't look at other key-value pairs
n
Example: Filtering out key-value pairs we don't need
n
map can emit more than one intermediate key-value pair for each incoming key-value pair
n
Example: Incoming data is text, map produces (word,1) for each word
n
reduce can aggregate data; it can look at multiple values, as long as map has mapped them to the same (intermediate) key
n
Example: Count the number of words, add up the total cost, ...
n Need to get the intermediate format right!
n
If reduce needs to look at several values together, map
Filtering algorithms
n Goal: Find lines/files/tuples with a particular characteristic
n Examples:
n
grep Web logs for requests to *.upenn.edu/*
n
find in the Web logs the hostnames accessed by 192.168.2.1
n
locate all the files that contain the words 'Apple' and 'Jobs'
n Generally: map does most of the work,
Aggregation algorithms
n Goal: Compute the maximum, the sum, the average, ..., over a set of values
n Examples:
n
Count the number of requests to *.upenn.edu/*
n
Find the most popular domain
n
Average the number of requests per page per Web site
n Often: map may be simple or the identity
A more complex example
n Goal: Billing for a CDN like Amazon CloudFront
n
Input: Log files from the edge servers. Two files per domain:
n
access_log-www.foo.com-20160316.txt: HTTP accesses
n
ssl_access_log-www.foo.com-20160316.txt: HTTPS accesses
n
Example line:
158.130.53.72 - - [03/Mar/2016:08:30:38 -0400] "GET /largeFile.ISO HTTP/1.1" 200 8130928734 "-"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11) Gecko/20100101 Firefox/44.0"
n
Mapper receives (filename,line) tuples
n
Billing policy (simplified):
n
Billing is based on a mix of request count and data traffic
n
10,000 HTTP requests cost $0.0075
Advanced Aggregation: Combiners
n Multiple map jobs on the same machine may write to the same reduce key
n
Example: map(1,"Apple juice") -> ("apple",1), ("juice",1)
n
map(2, "Apple sauce") -> ("apple",1),("sauce",1)
n
combiner: ("apple", [1,1]) -> ("apple", 2)
Mapper Combiner Reducer
Intersections and joins
n Goal: Intersect multiple different inputs on some shared values
n
Values can be equal, or meet a certain predicate
n Examples:
n
Find all documents with the words “data” and “centric” given an inverted index
n
Find all professors and students in common courses and
return the pairs <professor,student> for those cases
Partial Cartesian products
n Goal: Find some complex relationship, e.g., based on pairwise distance
n Examples:
n
Find all pairs of coffee shops within 100m of each other
n Generally hard to parallelize
n
But may be possible if we can divide the input into bins or tiles, or link it to some sort of landmark
n
Overlap the tiles? (how does this scale?)
n
Generate landmarks using clustering?
Sorting
n Goal: Sort input
n Examples:
n
Return all the domains covered by Google's index and the number of pages in each, ordered by the number of pages
n The programming model does not support this per se, but the implementations do
Let’s take a look at what happens in the Shuffle stage…
The shuffle stage revisited
File File
InputFormat
Split Split Split
RR RR RR
map map map
Reduce
OutputFormat
InputFormat
Split Split Split
RR RR RR
map map map
Reduce
OutputFormat
File File
Node 1 Node 2
File system File system
Combine Combine
Shuffle really consists of two parts:
•
Partition
•
Sort
Partition
Sort
Partition
Sort
Shuffle as a sort mechanism
n We can exploit the per-node sorting operation done by Shuffle
n
If we have a single reducer, we will get sorted output
n
If we have multiple reducers, we can get partly sorted output (or better – consider an order-preserving hash)
n
Note: It is not difficult to write a last-pass file that merges all of the
output files from the reducers
MapReduce Strengths/weaknesses
n What problems can you solve well with MapReduce?
n Are there problems you cannot solve efficiently with MapReduce?
n Are there problems it can't solve at all?
n How does it compare to other ways of doing large-scale data analysis?
n
Is MapReduce always the fastest/most efficient way?
Plan for today
n MapReduce
n
Programming model
n
Data flow
n
Example tasks
n Hadoop and HDFS
n
Architecture
n
Using Hadoop
n
Using HDFS
n
Beyond MapReduce
NEXT
MapReduce to Hadoop
MapReduce became a key data processing engine at Google
n
In fact, they built higher-level tools (Sawzall, Flume) that compiled down to MapReduce
But they also published info about it –
ultimately leading to an open source re-
implementation
2002-2004: Lucene and Nutch
n Early 2000s: Doug Cutting develops two open-source search projects:
n
Lucene: Search indexer
n Today used e.g., by Wikipedia
n
Nutch: A spider/crawler (with Mike Carafella)
n Nutch
n
Goal: Web-scale, crawler-based search
n
Written by a few part-time developers
n
Distributed, 'by necessity'
Demonstrated 100M web pages on 4 nodes, but true
2004-2006: GFS and MapReduce
n 2003/04: GFS, MapReduce papers published
n
Sanjay Ghemawat, Howard Gobioff, Shun-Tak Leung: "The Google File System", SOSP 2003
n
Jeffrey Dean and Sanjay Ghemawat: "MapReduce: Simplified Data Processing on Large Clusters", OSDI 2004
n
Directly addressed Nutch's scaling issues
n GFS & MapReduce added to Nutch
n
Two part-time developers over two years (2004-2006)
n
Crawler & indexer ported in two weeks
n
Ran on 20 nodes at IA and UW
n
Much easier to program and run, scales to several 100M web
pages, but still far from web scale
2006-2008: Yahoo
n 2006: Yahoo hires Cutting
n
Provides engineers, clusters, users, ...
n
Big boost for the project; Yahoo spends tens of M$
n
Not without a price: Yahoo has a slightly different focus (e.g., security) than the rest of the project; delays result
n Hadoop project split out of Nutch
n
Finally hit web scale in early 2008
n Cutting is now at Cloudera
n
Startup; started by three top engineers from Google, Facebook, Yahoo, and a former executive from Oracle
n
Has its own version of Hadoop; software remains free, but
company sells support and consulting services
Who uses ?
n Hadoop is running search on some of the Internet's largest sites:
n
Helped IBM's Watson to win Jeopardy
n
Amazon Web Services: Elastic MapReduce
n
AOL: Variety of uses, e.g., behavioral analysis & targeting
n
eBay: Search optimization (700-node cluster)
n Each node has: 24TB disk, 72GB RAM, 12 cores. Can run 26,000 MapReduce tasks simultaneously
n
Facebook: Reporting/analytics, machine learning (1100 m.), messaging
n 2000-node warehouse cluster, 21PB total storage capacity, ~400 million objects
n
Fox Interactive Media: MySpace, Photobucket, Rotten T.
n
IBM: Blue Cloud Computing Clusters
n
LinkedIn: 1000s of machines, e.g., people You May Know
n
Twitter: Store + process tweets, log files, other data
n
Apple: iAds platform
n
Netflix: Streaming summaries, analysis tasks
Subsequently
n
Hadoop today isn’t a system – it’s an “ecosystem”
n
Hadoop MapReduce + Hadoop FileSystem + YARN
n
And today: many other data processing platforms that take MapReduce’s place, e.g., Apache Hive, Spark, Flink
n
So, let’s explore:
n
What Hadoop looks like and how to use it
n
Programming models since Hadoop
Plan for today
n MapReduce
n
Programming model
n
Data flow
n
Example tasks
n Hadoop and HDFS
n
Architecture
n
Using Hadoop
n
Using HDFS
n
Beyond MapReduce
NEXT
Recall: Google’s MapReduce built on GFS and added two main aspects
n
A very simple programming model for data processing pipelines
n
Item-to-item map for key/value pairs
n
Shuffle by map’s output key
n
Reduce all items with the same key
n
A runtime ”engine” that ~guarantees successful
completion of the job, assuming adequate resources are available
n
Master hands off “chunks” of work to workers, which write their checkpointed results to (replicated) GFS
n
Scheme for automatic retries, partial load balancing, etc.
Hadoop MapReduce+HDFS
n
A 'modern' open-source 'clone' of MapReduce+GFS
n
Written in Java
n
Operates on HDFS, a page-level replicating filesystem
Modeled in part after GFS
Source: Hadoop HDFS architecture documentation
Hadoop MapReduce 1.X Architecture
n Job tracker (~MapReduce master):
n
Accepts jobs submitted by users
n
Gives tasks to Task trackers – makes scheduling decisions, co-locates tasks to data
n
Monitors task, tracker status, re-executes tasks if needed
n Task trackers (~MapReduce worker):
n
Run Map and Reduce tasks
n
Manage storage, transmission of intermediate output
n HDFS-related nodes:
A single n
ode can ru n
more than one of th ese!
Two example configurations
JobTracker NameNode
Secondary NameNode TaskTracker
Small cluster Medium cluster
JobTracker NameNode Secondary
NameNode
Plan for today
n MapReduce
n
Programming model
n
Data flow
n
Example tasks
n Hadoop and HDFS
n
Architecture
n
Using Hadoop
n
Using HDFS
n
Beyond MapReduce
NEXT
What do we need to write?
n A mapper
n
Accepts (key,value) pairs from the input
n
Produces intermediate (key,value) pairs, which are then shuffled
n A reducer
n
Accepts intermediate (key,value) pairs
n
Produces final (key,value) pairs for the output
n A driver
n
Specifies which inputs to use, where to put the outputs
n
Chooses the mapper and the reducer to use
n Hadoop takes care of the rest
Default behaviors can be customized by the driver
Hadoop data types
n Hadoop uses its own serialization
n
Java serialization is known to be very inefficient
n Result: A set of special data types
n
All implement the 'Writable' interface
n
Most common types shown above; also has some more
Name Description JDK equivalent
IntWritable 32-bit integers Integer
LongWritable 64-bit integers Long
DoubleWritable Floating-point numbers Double
Text Strings String
The Mapper
n Extends abstract 'Mapper' class
n
Input/output types are specified as type parameters
n Implements a 'map' function
n
Accepts (key,value) pair of the specified type
n
Writes output pairs by calling 'write' method on context
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
public class FooMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context) { context.write(new Text("foo"), value);
} }
Input format (file offset, line)
Intermediate format
can be freely chosen
The Reducer
n Extends abstract 'Reducer' class
n
Must specify types again (must be compatible with mapper!)
n Implements a 'reduce' function
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
public class FooReducer extends Reducer<Text, Text, IntWritable, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException
{
for (Text value: values)
context.write(new IntWritable(4711), value);
} }
Intermediate format
(same as mapper output) Output format
Note: We may get multiple values for
the same key!
The Driver
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FooDriver {
public static void main(String[] args) throws Exception { Job job = new Job();
job.setJarByClass(FooDriver.class);
job.setMapperClass(FooMapper.class);
job.setReducerClass(FooReducer.class);
// <longWritable, Text> is the default input format job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("in"));
FileOutputFormat.setOutputPath(job, new Path("out"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} }
n Specifies how the job is to be executed
Mapper&Reducer are in the same Jar as
FooDriver
Input and Output paths
Format of the input and output (key,value) pairs
Manual compilation
n
Goal: Produce a JAR file that contains the classes for mapper, reducer, and driver
n
This can be submitted to the Job Tracker, or run directly through Hadoop
n
Step #1: Put hadoop JAR into classpath:
export CLASSPATH=$CLASSPATH:/path/to/hadoop/hadoop-mapreduce-client- core-2.7.3.jar
n
Step #2: Compile mapper, reducer, driver:
javac FooMapper.java FooReducer.java FooDriver.java
Running a job in standalone mode
n
Step #1: Create & populate (HDFS) input directory
n
Configured in the Driver via addInputPath()
n
Put input file(s) into this directory (ok to have more than 1)
n
Output HDFS directory must not exist yet!
n
Step #2: Run Hadoop
n
As simple as this: hadoop jar <jarName>
<driverClassName>
n
Example: hadoop jar foo.jar FooDriver
n
In verbose mode, Hadoop will print statistics while running
n
Step #3: Collect output files
Accessing data in HDFS
n HDFS implements a separate namespace
n
Files in HDFS are not visible in the normal file system
[cis455@host ~]$ ls -la /tmp/hadoop/dfs/data/current/
total 209588
drwxrwxr-x 2 liuv liuv 4096 2017-02-13 15:46 . drwxrwxr-x 5 liuv liuv 4096 2017-02-13 15:39 ..
-rw-rw-r-- 1 liuv liuv 11568995 2017-02-13 15:44 blk_-3562426239750716067
-rw-rw-r-- 1 liuv liuv 90391 2017-02-13 15:44 blk_-3562426239750716067_1020.meta -rw-rw-r-- 1 liuv liuv 4 2017-02-13 15:40 blk_5467088600876920840
-rw-rw-r-- 1 liuv liuv 11 2017-02-13 15:40 blk_5467088600876920840_1019.meta -rw-rw-r-- 1 liuv liuv 67108864 2017-02-13 15:44 blk_7080460240917416109
-rw-rw-r-- 1 liuv liuv 524295 2017-02-13 15:44 blk_7080460240917416109_1020.meta -rw-rw-r-- 1 liuv liuv 67108864 2017-02-13 15:44 blk_-8388309644856805769
-rw-rw-r-- 1 liuv liuv 524295 2017-02-13 15:44 blk_-8388309644856805769_1020.meta -rw-rw-r-- 1 liuv liuv 67108864 2017-02-13 15:44 blk_-9220415087134372383
-rw-rw-r-- 1 liuv liuv 524295 2017-02-13 15:44 blk_-9220415087134372383_1020.meta -rw-rw-r-- 1 liuv liuv 158 2017-02-13 15:40 VERSION
[cis455@host ~]$
Accessing data in HDFS
n File access is through the hdfs command:
n
hdfs dfs -put [file] [hdfsPath] Stores a file in HDFS
n
hdfs dfs -ls [hdfsPath] List a directory
n
hdfs dfs -get [hdfsPath] [file] Retrieves a file from HDFS
n
hdfs dfs -rm [hdfsPath] Deletes a file in HDFS
n
hdfs dfs -mkdir [hdfsPath] Makes a directory in HDFS
[cis455@host ~]$ /usr/local/hadoop-2.7.3/bin/hdfs dfs -ls /user/cis455 Found 4 items
-rw-r--r-- 1 cis455 supergroup 1366 2019-02-13 15:46 /user/cis455/README.txt -rw-r--r-- 1 cis455 supergroup 0 2019-02-13 15:35 /user/cis455/input -rw-r--r-- 1 cis455 supergroup 0 2019-02-13 15:39 /user/cis455/input2 -rw-r--r-- 1 cis455 supergroup 212895587 2019-02-13 15:44 /user/cis455/input3 [cis455@host ~]$
Alternatives to the command line
n Getting data in and out of HDFS through the command-line interface is a bit cumbersome!
n Alternatives have been developed:
n
FUSE file system: Allows HDFS to be mounted under Unix
n
WebDAV share: Can be mounted as filesystem on many OSes
n
HTTP: Read access through namenode's embedded web svr
n
FTP: Standard FTP interface
n
HDFS-UI webapp
Accessing HDFS directly from Java
n Programs can read/write HDFS files directly
n
Not needed in MapReduce; I/O is handled by the framework
n Files are represented as URIs
n
Example: hdfs://localhost/user/cis455/example.txt
n Access is via the FileSystem API
n
To get access to the file: FileSystem.get()
n
For reading, call open() -- returns InputStream
n
For writing, call create() -- returns OutputStream
Security and permissions have evolved
n Since 0.16.1, Hadoop has had rudimentary support for POSIX-style permissions
n
rwx for users, groups, 'other' -- just like in Unix
n
‘hdfs dfs' has support for chmod, chgrp, chown
n But: POSIX model is not a very good fit
n
Many combinations are meaningless: Files cannot be executed, and existing files cannot really be written to
n More recent models added additional security features
n
Kerberos RPC (SASL/GSSAPI), delegation tokens
Recap: HDFS
n HDFS: A specialized distributed file system
n
Good for large amounts of data, sequential reads
n
Bad for lots of small files, random access, non-append writes
n Architecture: Blocks, namenode, datanodes
n
File data is broken into large blocks (64MB default)
n
Blocks are stored & replicated by datanodes
n
Single namenode manages all the metadata
n
Secondary namenode: Housekeeping & (some) redundancy
n Usage: Special command-line interface
n
Example: hdfs dfs -ls /path/in/hdfs
Plan for today
n Hadoop and HDFS
n
Architecture
n
Using Hadoop
n
Using HDFS
n
Beyond MapReduce
n Remote Procedure Calls
n Web Services
NEXT
Lots of Places where MR is Limited
n
What if I want streaming data instead of static data?
n
What if I want low-latency jobs instead of batch jobs?
n
What if my computation requires iteration (e.g., transitive closure, clustering, classification, …)?
n
MapReduce can only do iteration in a kludgey way
1. “base case” job produces a “single” output file combining all input data 2. “iterative case” reads the last stage’s output file, does its computation,
writes another “single” output file for the next iteration
3. “termination case” converts the last output file into something a human
can read
YARN Separates Out Core Scheduling &
Resource Management
Logic specific to
“engine”
The New Apache Hadoop (V2+):
Now a “Stack”, Not Just MapReduce
http://hortonworks.com/wp-content/uploads/2014/10/YARN- with-Slider.png
Zo ok ee pe r (co or di na tio n) Hb as e St ru ct ur ed d at a st or e
Fl um e (l og s) Sqoop
(data
exchange)
Generalizing the Programming Model
n MapReduce offered a simple programming model over one collection of data – with map and
reduce
n Is there a more general set of operations we might consider?
n In fact, one version is Java’s streams API!
n A “stream” = a collection of objects that can be
Java Streams
List<Item> records = …
Stream<Item> s = records.stream();
s1 s2 s3 s4 s5 s6
s1 s3 s4 s6
s
s.filter(x -> p(x))
.collect(Collectors.toList()) Filter operator:
apply predicate p to each item x in a collection,
assemble x’s for which p(x) is true
p(s1)
p(s2) p(s3) p(s4)
p(s5)
p(s6)
Java Streams
s1 s2 s3 s4 s5 s6
s
f(s1) f(s2) f(s3) f(s4) f(s5) f(s6)
s.map(x -> f(x))
.collect(Collectors.toList()) Map operator:
apply f to each item x in a collection,
assemble f(x)’s into a collection
List<Item> records = …
Stream<Item> s = records.stream();
Java Streams
s1 s2 s3 s4 s5 s6
s
s.flatmap(x -> F(x))
.collect(Collectors.toList()) FlatMap operator:
apply F to each item x in a collection,
union F(x)’s sets into a collection
{f
11,
f
21} Ø {f
13} {f Ø
15, f
25, f
35}
{f
16, f
26} List<Item> records = …
Stream<Item> s = records.stream();
Filtering, Mapping, and FlatMapping
This same abstraction shows up in:
n
SQL WHERE queries – Filter
n
List comprehensions – Map
n
R and Python apply() – Map
n
Google’s original MapReduce model – really a FlatMap
n
Apache Hadoop MapReduce, Cascading, Flume, Flink, Spark,
…
n
Microsoft’s LINQ
Examples
String[] text = {"line 1", "the quick brown dog", "jumped over the lazy fox"};
Stream<String> myStream = Arrays.stream(text);
Stream<String[]> wordArrs = myStream.map(str -> str.split(" "));
Stream<String> firstWords = wordArrs.map(arr -> arr.length == 0 ? "" : arr[0]).filter(x -> !x.isEmpty());
// Stream: [line, the, jumped]
Stream<String> allWords = myStream.flatMap(str -> Arrays.stream(str.split("
")));
List<String> allWordList = allWords.collect(Collectors.toList());
// List: [line, 1, the, quick, brown, dog, jumped, over, the, lazy, fox]
Collectors and Aggregates
n Java has a notion of Collectors that take streams and add structure to them
n
Collectors.toSet(), toList(), etc
n
Collectors.groupingBy() which creates a map from an item’s key to a List of items (with that key)
n And generally we can use aggregate functions to return single values from collections (~reduce)
e.g., a count of a set, an average string length, the top item,
Collectors: Grouping and Aggregation
String[] text = {"line 1", "the quick brown dog", "jumped over the lazy fox"};
Stream<String> myStream = Arrays.stream(text);
allWords = myStream.flatMap(str -> Arrays.stream(str.split(" ")));
// Collect into a single group, count all items System.out.println(allWords.count());
// 11
// Output how many words start with each letter allWords.collect(
Collectors.groupingBy(v -> v.substring(0,1))).
forEach((key, list) -> System.out.print(key + ": " + list.size()));
// q: 1 1: 1 b: 1 d: 1 t: 2 f: 1 j: 1 l: 2 o: 1
Order
Finally, we can sort items:
mystream.sorted().collect(Collectors.toList());
// [1, brown, dog, fox, jumped, lazy, line, over, quick, the, the]
Beyond MapReduce:
Apache Spark, Flink, FlumeJava, …
n