• No results found

map/reduce connected components

N/A
N/A
Protected

Academic year: 2021

Share "map/reduce connected components"

Copied!
25
0
0

Loading.... (view fulltext now)

Full text

(1)

map/reduce – connected components

find connected components with analogous algorithm:

• map edges randomly to partitions (k subgraphs of ≤ n nodes)

• for each partition remove edges, so that only tree remains (reduce) – connectivity is not affected

– each reducer returns ≤ n edges result graph has fewer edges (≤ k · n)

• number of iterations analogous to MST

1,

(2)

map/reduce – generalizing the filtering approach

• partition randomly into subproblems

• reduce size in each subproblem independently

• recombine to (reduced) problem

• repeat until problem size is small enough for single node

• solve small/sparse instance on single node

for instantiations, two guarantees are needed:

• filtered parts in the subproblems do not affect optimal solution

• number of iterations is limited

(3)

map/reduce – performance of algorithms

map reduce algorithms vary in a number of behavioral parameters which are characteristic for

• performance (total computation time)

• incurred workload (total involved work on all machines)

• space consumption (memory)

these can be formalized in two groups1

key complexity - resource consumption on individual nodes sequential complexity - overall resource consumption

1Goel, Munagala, 2012

3,

(4)

map/reduce – performance of algorithms

key complexity

• maximum size of any key/value pair

• maximum running time for any mapper or reducer on any key/value pair

• maximum memory consumption for any mapper or reducer on any key/value pair

nodes must be capable of executing individual map/reduce operations

sequential complexity

• size of all key/value pairs input and output by mappers/reducers

• total running time for all mappers/reducers whole system must be capable of execution

(e.g. sufficient number of machines)

(5)

map/reduce – complexity classes

• efficiency of algorithms is measured by their complexity

• complexity (time/space) is defined depending on size of input problem

• problems are distinguished by the (possible) existence of “efficient”

algorithms

example: P

2

P = { decision problems, solvable in time O(p(n)) where p is a polynomial of input size n}.

• problems are “tractable” (efficiently solvable) if polynomial time algorithm exists

is there a comparable definition for map/reduce ?

2This is not the exact definition, but an illustration.

5,

(6)

complexity for parallel algorithms: N C

PRAM setting: many cores, common memory

• parallel algorithms can use arbitrary many cores

• these are usually not available (have to be emulated)

N C - Nick’s class

A decision problem is in N C:

• there exists an algorithm solving it

• which uses polynomial number of cores (O(nk))

• solves the problem in polylogarithmic time (O((log n)c)) with k and c constant and n being the input size.

• considered to be the class of efficiently parallelizable problems

• variants: N Ci in time O((log n)i) (often denoted O(login))

(7)

complexity for map reduce algorithms MRC

i

Let  > 0 be a fixed value.

Let the input be a finite sequence hki, vii with total size (in bits) n.

An algorithm A consists of R map (µ) and reduce (ρ) steps 1, ρ1, µ2, ρ2, . . . , µR, ρRi.

A is in MRCi, if it outputs the correct answer with probability at least 3/4 and for input size n:

• each µrr is a randomized mapper/reducer with run-time polynomial in n and memory consumption O(n1−) and word length O(log n)

• the total space consumption of key/value pairs resulting from any µr is in O(n2−2) (note: = (n1−)2)

• the number of rounds R ∈ O(login).

deterministic variant DMRCi with probability 1

source: Karloff, Suri, Vassilvitskii, A Model of Computation for MapReduce, 2010

7,

(8)

interpretation of MRC

i

• each individual task (map/reduce) has polynomial runtime

• space consumption is O(n1−) e.g. for  = 0.5 O(n)

 should be maximized

• a mapper should not produce more than quadratic amount of output

O(n1−) in memory, output O(n2−2)

• total memory on all nodes limited to O(n2−2) needs O(n1−) nodes

• note: P and N C are classes of problems, MRC is a class of algorithms

open question: how to distribute (shuffle) O(n2−2) key/value pairs in memory O(n1−1)

Graham’s Greedy Algorithm (Graham 1966)

(9)

MST complexity

• note: notation of MRC and MST collide, n and  from MST

• input size: n1+c

• steps: dc/e ∈ O(log n) (log n = 1 + c)

• nodes in MST need memory n1+

• memory: n1+ < n1+c, otherwise direct solution on single node

9,

(10)

Hadoop/HDFS

(11)

introduction

• up to here: only mongoDB implementation of mapreduce considered

• this is not a full implementation

• comes with limitations

– e.g. no guarantee that all keys end up at one reducer – behavior can not be influenced

• has the advantage of simple setup and simple usage

• example for m/r-implementation with more features and possiblities is Hadoop/HDFS

• allows implementations in Java

11,

(12)

introduction

two main components

• HDFS - Hadoop distributed file system – runs on top of OS file system

– provides a view on “real files” stored on the actual hardware – fixed block size (64MB)

– optimized for write once, read often

• Hadoop - the execution layer

– implements the mapreduce execution – handles failed tasks (retry and give up) – handles distribution of tasks

• both (storage and execution layer) run on the same nodes

(13)

typical architecture

• a network of nodes is connected to an Hadoop cluster

• one master node

– NameNode - address data (which block on which node) – JobTracker - execution management

• slave nodes

– DataNode - data storage

– TaskTracker - execution of tasks

– both processes run on the same (physical) node

• clients send jobs to JobTracker

• JobTracker distributes tasks among slave nodes

13,

(14)

HDFS - overview

• NameNode distributes blocks to nodes – ensures redundancy

– constant contact: “check-in” from slaves – keeps data organized as files and directories – is “single point of failure”

– handles only meta-data, nodes and clients communicate directly

• optimized for streaming access – no random file access

– no appending of data

• organized like Unix file systems

– can be mounted (i.e. blended into general file system)

(15)

setting up an example installation

requirements

• Hadoop/HDFS use ssh and rsync for communication

• ssh - secure shell client (remote login), need client and server (sshd)

• rsync - remote synchronization (data transfer)

• jobs are implemented in Java, need Java Runtime Environment

• installation from tarball3

• use latest stable version (2.4.1)

• local installation - standalone mode

• extract tarball, change into directory, test: run bin/hadoop

3source: https://hadoop.apache.org/releases.html

15,

(16)

test job execution

• Hadoop distribution provides example jobs in hadoop-examples-1.1.2.jar

• jobs need input an output directory

• create input directory and copy some files in there

$mkdir input

$cp conf/*.xml input

• execute example job:

$bin/hadoop jar hadoop-examples-1.1.2.jar\

grep input output ’dfs[a-z.]+’

• result:

– lots of logging output – files in output

ls output/

_SUCCESS part-00000

(17)

job implementation

• a job is defined in an arbitrary Java class

• Hadoop will start the main()-method

• main method configures and runs the actual job

Job configuration

• name

• input/output format

special classes providing verification and reading/writing methods for I/O operations

• output key/value classes - type specifications as Java classes

• mapper/combiner/reducer

– mapper and reducer as usual but as Java classes

– combiner analogue but used to combine mapper output before sending to other nodes

• input/output paths for file access

17,

(18)

putting a job together

public static void main(final String[] args) throws Exception { final JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

(19)

the mapper class

• implement interface Mapper

• has type parameters:

– input key, input value – output key/value

• class MapReduceBase provides empty implementations for functions

• map function implemented as public void map() – key and value as input (according to type parameters) – OutputCollector used to emit key/value pairs – Reporter for logging and progress reports

example: word count

19,

(20)

public static class Map extends MapReduceBase implements

Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private final Text word = new Text();

@Override

public void map(final LongWritable key, final Text value,

final OutputCollector<Text, IntWritable> output, final Reporter reporter) throws IOException { final String line = value.toString();

final StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken());

output.collect(word, one);

} } }

(21)

the reducer class

• again, type parameters for input and output

• function reduce() for the actual task

• reporting and output collection analogous to mapper class

• both classes need access to hadoop library hadoop-core-1.1.2.jar

– can be found in main directory of distribution

21,

(22)

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(final Text key,

final Iterator<IntWritable> values,

final OutputCollector<Text, IntWritable> output, final Reporter reporter) throws IOException { int sum = 0;

while (values.hasNext()) { sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

} }

(23)

compile and execute

• before execution, classes have to be compiled and packed into jar-file

• eclipse export is possible, alternatively in distribution directory:

mkdir wordcount_classes

javac -classpath hadoop-1.1.2-core.jar -d wordcount_classes\

WordCount.java

jar -cvf wordcount.jar -C wordcount_classes

• without config, hdfs read directly from system:

bin/hadoop dfs -ls /tmp

bin/hadoop dfs -cat /tmp/hadoop/input/test.txt

23,

(24)

compile and execute

• run with

– /tmp/hadoop/input as input

– /tmp/hadoop/output as output directory – class WordCount in package my.pack

– here: directly in main dir of Hadoop distribution – otherwise: provide full path to jar

bin/hadoop wordcount.jar my.pack.WordCount \ /tmp/hadoop/input /tmp/hadoop/output

– TextInputFormat reads all files in the input dir

• use line number as key, line as value

– TextOutputFormat writes all key/value pairs as plain text to output dir

• for more details, c.f.

hadoop.apache.org/docs/stable/mapred_tutorial.html#

Inputs+and+Outputs

(25)

using hadoop with python

• hadoop supports execution of scripts in arbitrary languages

• interface: input and output via system in/out streams scripts read input from stdin and write output to stdout

#!/bin/bash

˜/opt/hadoop/bin/hadoop jar ˜/opt/hadoop/share/hadoop/tools/lib/hadoop-*streaming*.jar \ -mapper mapper.py -reducer reducer.py \

-input pg4300.txt -output pg4300.out

c.f. example from: http://www.michael-noll.com/tutorials/

writing-an-hadoop-mapreduce-program-in-python/

• input/output comes as text/csv files (tab delimited)

25,

References

Related documents

public void map(LongWritable key, Text value, OutputCollector&lt;Text, Text&gt; output, Reporter reporter) throws IOException. //taking one line/record at a time and parsing them

public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String line = ((Text)value).toString();. StringTokenizer itr

public void map(LongWritable key, Text value, OutputCollector&lt;Text, IntWritable&gt; output, Reporter reporter) throws IOException { 19. StringTokenizer tokenizer =

public void map(Type key, Type value, Context context) throws IOException, InterruptedException. { // map

The framework calls the application's Reduce function once for each unique key in the sorted order. These key-value pairs can be of any type from those sent from

The voluntary activity of refugees thus helped diffuse conflicts arising from the contradictions between refugees and the state, by using social capital between refugees as

BESPOKE PROTECTION: CATLOC 1005... BESPOKE PROTECTION:

Pada penelitian rencana pengembangan industri provinsi Kalimantan Timur, peneliti mengamati terdapat kesenjangan wilayah yang cukup besar antara Pulau Jawa dan Luar