Big Data and Scripting
map/reduce in Hadoop
parts of a Hadoop map/reduce implementation
• core framework provides customization via indivudual map and reduce functions
– e.g. implementation in mongoDB
• Hadoop provides customization at all steps:
– input reading – map
– combine – partition – shuffle and sort – reduce
– output
input reading
• split up input file(s) into records of key and value
• key: usually positional information
• value: line/part of file
• keys have no influence on distribution to mappers
• individual input readers implement InputFormat
• use to split up e.g. XML-files into records
• input readers should split, but not parse the input
• input readers are run on individual blocks
• executed on nodes that store the corresponding blocks
• records that could overlap blocks have to be handled manually
combine
• combine patterns emitted on a node before distribution
• mapper outputs on each node fed into combiner
• output of combiner distributed to reducers
• combiners compress output on a semantic level – example: word count, reduce 3× (word,1) → (word,3)
• framework implements compression on data level
• combiners implement the Reducer interface – same input/output formats
partition
• distribute key/value pairs to nodes – which keys go to which nodes
• output of partitioner is written to HDFS – one file per partition&node
• nodes that run reducers pull (download) files corresponding to their partition
• default: hash functions for random uniform distribution
• partitioner can influence distribution directly
– e.g. ensure that certain pairs end up in the same node – output of these will also end up on the same node
• interface: Partitioner
• map key/value → partition (int)
influencing the shuffle/sort
• keys are distributed by partitioner to reducer nodes
• reducers download partition files from mapper-nodes
• each reducer node starts a shuffle/sort process
• sort key/value pairs by key
• only parameter: custom Comparator for keys – influence order and equality
– implement ordering within equivalent classes of keys
output
• analogous to import
• write local output of reducers to disk/HDFS
generic information distribution
• transport information in addition to key/value pairs
• examples:
– small-scale parameters, e.g. number of clusters for k-means – large-scale additional information, e.g. look-up tables
• use JobConf to distribute algorithm parameters
– set generic parameters using e.g. set(String name, String value)
– use config() implementation of
mapper/reducer/partitioner/combiner to retrieve
– before execution each class is provided with configuration – initialize class variables from general job configuration
map reduce design patterns
• book: “MapReduce Design Patterns”
(Miner, Shook, 2012, OReilly)
design patterns
• describe mechanisms common to many algorithms
• solve similar problems in a number of contexts
• in a sense “generic algorithms”
Summarization
• general intention
– group records by common field
– aggregate all value with identical value in field
• examples:
– word count, mean, min, max, counting within groups – similar to GROUP BY in SQL but with individual aggregation
• result:
– table with one entry (row) per group
– each row contains group key, aggregated value(s)
Summarization
• implementation:
– map each input element to group (using group as key) remove values not needed for aggregation
– combine partial aggregation if possible
– reduce aggregate values and return group key/(aggregated values)
• combiner can drastically reduce network traffic
• custom partitioner can help to resolved skewed group-size distribution
Counting
• reduce step not necessary
• for limited number of counters,
– can be implemented using only mappers – using the Reporter-class
• pattern:
– mappers count occurrences, store in Reporter – produce no output pairs, no shuffle or reduction
• functions:
– Reporter.incrCounter() create/increase counter – Reporter.getCounter() read contents
Filtering
• filter input records by some property
• limit execution to those that pass
• examples:
– distributed grep – thresholding – data cleansing – random sampling
• as with counting, no reduce is necessary
• data is read and written from/to local node
• additional reduce can be used to write filter result to single file – many small files slow down computations
– mapping all filtered results to single reducer allows to compact
Distinct
• intention
– return only distinct values from a larger set
– filter duplicates or values that are highly similar to each other
• implementation
– for each record, extract field considered in similarity – use those as key, null as value
– shuffle will transport identical values to one reducer – reducer only stores one copy
• combiner is extremely useful
• use many reducers
– single mapper produces many distinct values – reducer does not produce high load
structured to hierarchical
• intention
– convert a flat format (e.g. an SQL-table with value repetitions) into structured format
• example:
– given a table storing values for a foreign key – each row contains set of values and key – keys are repeated
– structure by storing key → set of values
• implementation
– map rows to foreign key (higher element in desired structure) – collect all values for each key and store as structured records
• can be repeated bottom up for several layers of structure
• used to convert flat data into structured records such as JSON/XML
partitioning
• intention
– partition data by some property (e.g. data)
– create smaller groups that can be analyzed individually
• example
– group log entries by date to allow analysis on a monthly basis
• implementation
– use identity mapper with partition property as key – implement custom partitioner, creating partitions by key – reducer only writes values to output
• all data is written to one logical file
• blocks are distributed as proposed in the partitioner
e.g. all blocks for a particular month are on one DataNode
sorting with the shuffle step
• implementation of sort algorithm can exploit the shuffle/sort step
• use the idea presented before:
– analyze phase determines sorting buckets – order phase sorts
• analyze (first m/r-step)
– sample input and map to sort keys, without value – set number of reducers to one
– shuffling sorts keys, single reducer gets sorted list of keys – create slices of equal size
sorting - order phase
• mapper maps to sort key, values attached this time
• custom partitioner applies partition from previous step to input values
• one reducer for each partition
• reducers
– receive data ordered by keys – only write incoming data to file
• output is written to part-r-* files (number instead of *)
• file ordering corresponds to ordering of values
• values within files are sorted
shuffle
• intention: destroy order of data arrangement/create random order/partitions
• motivation/applications – anonymizing
– repeatable random sampling
– distribution of highly accessed parts to multiple nodes
• pattern
– map values to random key
– shuffling implements random distribution
– reducer only writes and produces random partitions with random order
Joins
• join as in SQL JOIN, join two tables by common key
• implementation
– map values to join key
– use partitioner for even distribution to reducers – reducer collects values
• in temporary lists
• using external storage if necessary
• one list for each source table
– in final step reducer produces output pairs from lists
• can be used to implement all types of joins:
inner, outer, left, right, anti
• all elements with common key are computed on single reducer – can be problematic when many values have the same key
replicated join
• join very large data set with many small data sets
• large data set is “left” → output elements correspond to elements of large data set
• small data sets fit into memory
• implementation
– mapper reads smaller data sets in initialization
– processes each record by joining it with elements from small data sets
– no combine/shuffle/reduce
– joined data is written directly after mapping
cartesian product
• intention: create all pairs of input values from data sets A and B
• application:
– pairwise analysis of elements (e.g. determine distance matrix)
• pattern:
– create partitions of both data sets: A1, . . . , An (n parts), B1, . . . , Bm (m parts)
– repeat values in each partition A11, . . . , Am1, . . . – use n · m reducers
– each reducer receives all values from pair of parts Ai, Bj
– reducer produces all possible pairings