map/reduce – connected components
find connected components with analogous algorithm:
• map edges randomly to partitions (k subgraphs of ≤ n nodes)
• for each partition remove edges, so that only tree remains (reduce) – connectivity is not affected
– each reducer returns ≤ n edges result graph has fewer edges (≤ k · n)
• number of iterations analogous to MST
1,
map/reduce – generalizing the filtering approach
• partition randomly into subproblems
• reduce size in each subproblem independently
• recombine to (reduced) problem
• repeat until problem size is small enough for single node
• solve small/sparse instance on single node
for instantiations, two guarantees are needed:
• filtered parts in the subproblems do not affect optimal solution
• number of iterations is limited
map/reduce – performance of algorithms
map reduce algorithms vary in a number of behavioral parameters which are characteristic for
• performance (total computation time)
• incurred workload (total involved work on all machines)
• space consumption (memory)
these can be formalized in two groups1
key complexity - resource consumption on individual nodes sequential complexity - overall resource consumption
1Goel, Munagala, 2012
3,
map/reduce – performance of algorithms
key complexity
• maximum size of any key/value pair
• maximum running time for any mapper or reducer on any key/value pair
• maximum memory consumption for any mapper or reducer on any key/value pair
nodes must be capable of executing individual map/reduce operations
sequential complexity
• size of all key/value pairs input and output by mappers/reducers
• total running time for all mappers/reducers whole system must be capable of execution
(e.g. sufficient number of machines)
map/reduce – complexity classes
• efficiency of algorithms is measured by their complexity
• complexity (time/space) is defined depending on size of input problem
• problems are distinguished by the (possible) existence of “efficient”
algorithms
example: P
2P = { decision problems, solvable in time O(p(n)) where p is a polynomial of input size n}.
• problems are “tractable” (efficiently solvable) if polynomial time algorithm exists
is there a comparable definition for map/reduce ?
2This is not the exact definition, but an illustration.
5,
complexity for parallel algorithms: N C
PRAM setting: many cores, common memory
• parallel algorithms can use arbitrary many cores
• these are usually not available (have to be emulated)
N C - Nick’s class
A decision problem is in N C:
• there exists an algorithm solving it
• which uses polynomial number of cores (O(nk))
• solves the problem in polylogarithmic time (O((log n)c)) with k and c constant and n being the input size.
• considered to be the class of efficiently parallelizable problems
• variants: N Ci in time O((log n)i) (often denoted O(login))
complexity for map reduce algorithms MRC
iLet > 0 be a fixed value.
Let the input be a finite sequence hki, vii with total size (in bits) n.
An algorithm A consists of R map (µ) and reduce (ρ) steps hµ1, ρ1, µ2, ρ2, . . . , µR, ρRi.
A is in MRCi, if it outputs the correct answer with probability at least 3/4 and for input size n:
• each µr/ρr is a randomized mapper/reducer with run-time polynomial in n and memory consumption O(n1−) and word length O(log n)
• the total space consumption of key/value pairs resulting from any µr is in O(n2−2) (note: = (n1−)2)
• the number of rounds R ∈ O(login).
deterministic variant DMRCi with probability 1
source: Karloff, Suri, Vassilvitskii, A Model of Computation for MapReduce, 2010
7,
interpretation of MRC
i• each individual task (map/reduce) has polynomial runtime
• space consumption is O(n1−) e.g. for = 0.5 O(√ n)
should be maximized
• a mapper should not produce more than quadratic amount of output
O(n1−) in memory, output O(n2−2)
• total memory on all nodes limited to O(n2−2) needs O(n1−) nodes
• note: P and N C are classes of problems, MRC is a class of algorithms
open question: how to distribute (shuffle) O(n2−2) key/value pairs in memory O(n1−1)
Graham’s Greedy Algorithm (Graham 1966)
MST complexity
• note: notation of MRC and MST collide, n and from MST
• input size: n1+c
• steps: dc/e ∈ O(log n) (log n = 1 + c)
• nodes in MST need memory n1+
• memory: n1+ < n1+c, otherwise direct solution on single node
9,
Hadoop/HDFS
introduction
• up to here: only mongoDB implementation of mapreduce considered
• this is not a full implementation
• comes with limitations
– e.g. no guarantee that all keys end up at one reducer – behavior can not be influenced
• has the advantage of simple setup and simple usage
• example for m/r-implementation with more features and possiblities is Hadoop/HDFS
• allows implementations in Java
11,
introduction
two main components
• HDFS - Hadoop distributed file system – runs on top of OS file system
– provides a view on “real files” stored on the actual hardware – fixed block size (64MB)
– optimized for write once, read often
• Hadoop - the execution layer
– implements the mapreduce execution – handles failed tasks (retry and give up) – handles distribution of tasks
• both (storage and execution layer) run on the same nodes
typical architecture
• a network of nodes is connected to an Hadoop cluster
• one master node
– NameNode - address data (which block on which node) – JobTracker - execution management
• slave nodes
– DataNode - data storage
– TaskTracker - execution of tasks
– both processes run on the same (physical) node
• clients send jobs to JobTracker
• JobTracker distributes tasks among slave nodes
13,
HDFS - overview
• NameNode distributes blocks to nodes – ensures redundancy
– constant contact: “check-in” from slaves – keeps data organized as files and directories – is “single point of failure”
– handles only meta-data, nodes and clients communicate directly
• optimized for streaming access – no random file access
– no appending of data
• organized like Unix file systems
– can be mounted (i.e. blended into general file system)
setting up an example installation
requirements
• Hadoop/HDFS use ssh and rsync for communication
• ssh - secure shell client (remote login), need client and server (sshd)
• rsync - remote synchronization (data transfer)
• jobs are implemented in Java, need Java Runtime Environment
• installation from tarball3
• use latest stable version (2.4.1)
• local installation - standalone mode
• extract tarball, change into directory, test: run bin/hadoop
3source: https://hadoop.apache.org/releases.html
15,
test job execution
• Hadoop distribution provides example jobs in hadoop-examples-1.1.2.jar
• jobs need input an output directory
• create input directory and copy some files in there
$mkdir input
$cp conf/*.xml input
• execute example job:
$bin/hadoop jar hadoop-examples-1.1.2.jar\
grep input output ’dfs[a-z.]+’
• result:
– lots of logging output – files in output
ls output/
_SUCCESS part-00000
job implementation
• a job is defined in an arbitrary Java class
• Hadoop will start the main()-method
• main method configures and runs the actual job
Job configuration
• name
• input/output format
special classes providing verification and reading/writing methods for I/O operations
• output key/value classes - type specifications as Java classes
• mapper/combiner/reducer
– mapper and reducer as usual but as Java classes
– combiner analogue but used to combine mapper output before sending to other nodes
• input/output paths for file access
17,
putting a job together
public static void main(final String[] args) throws Exception { final JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
the mapper class
• implement interface Mapper
• has type parameters:
– input key, input value – output key/value
• class MapReduceBase provides empty implementations for functions
• map function implemented as public void map() – key and value as input (according to type parameters) – OutputCollector used to emit key/value pairs – Reporter for logging and progress reports
example: word count
19,
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private final Text word = new Text();
@Override
public void map(final LongWritable key, final Text value,
final OutputCollector<Text, IntWritable> output, final Reporter reporter) throws IOException { final String line = value.toString();
final StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken());
output.collect(word, one);
} } }
the reducer class
• again, type parameters for input and output
• function reduce() for the actual task
• reporting and output collection analogous to mapper class
• both classes need access to hadoop library hadoop-core-1.1.2.jar
– can be found in main directory of distribution
21,
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(final Text key,
final Iterator<IntWritable> values,
final OutputCollector<Text, IntWritable> output, final Reporter reporter) throws IOException { int sum = 0;
while (values.hasNext()) { sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
} }
compile and execute
• before execution, classes have to be compiled and packed into jar-file
• eclipse export is possible, alternatively in distribution directory:
mkdir wordcount_classes
javac -classpath hadoop-1.1.2-core.jar -d wordcount_classes\
WordCount.java
jar -cvf wordcount.jar -C wordcount_classes
• without config, hdfs read directly from system:
bin/hadoop dfs -ls /tmp
bin/hadoop dfs -cat /tmp/hadoop/input/test.txt
23,
compile and execute
• run with
– /tmp/hadoop/input as input
– /tmp/hadoop/output as output directory – class WordCount in package my.pack
– here: directly in main dir of Hadoop distribution – otherwise: provide full path to jar
bin/hadoop wordcount.jar my.pack.WordCount \ /tmp/hadoop/input /tmp/hadoop/output
– TextInputFormat reads all files in the input dir
• use line number as key, line as value
– TextOutputFormat writes all key/value pairs as plain text to output dir
• for more details, c.f.
hadoop.apache.org/docs/stable/mapred_tutorial.html#
Inputs+and+Outputs
using hadoop with python
• hadoop supports execution of scripts in arbitrary languages
• interface: input and output via system in/out streams scripts read input from stdin and write output to stdout
#!/bin/bash
˜/opt/hadoop/bin/hadoop jar ˜/opt/hadoop/share/hadoop/tools/lib/hadoop-*streaming*.jar \ -mapper mapper.py -reducer reducer.py \
-input pg4300.txt -output pg4300.out
c.f. example from: http://www.michael-noll.com/tutorials/
writing-an-hadoop-mapreduce-program-in-python/
• input/output comes as text/csv files (tab delimited)
25,