Distributed Computing with Hadoop
Stefan Theußl, Albert WeichselbraunWU Vienna University of Economics and Business Augasse 2–6, 1090 Vienna
Agenda
Problem & Motivation The MapReduce Paradigm Distributed Text Mining in R Distributed Text Mining in Java Distributed Text Mining in Python Implementation Details
Hadoop API Hadoop Streaming eWRT
Motivation
Main motivation: large scale data processing
I Many tasks, i.e. we produce output data via processing lots of input data
I Want to make use of many CPUs
I Typically this is not easy (parallelization, synchronization, I/O, debugging, etc.)
The MapReduce Paradigm
I Programming model inspired by functional language primitives
I Automatic parallelization and distribution I Fault tolerance
I I/O scheduling
I Examples: document clustering, web access log analysis, search index construction, . . .
Jeffrey Dean and Sanjay Ghemawat.
MapReduce: Simplified data processing on large clusters.
InOSDI’04, 6th Symposium on Operating Systems Design and Implementation, pages 137–150, 2004.
Hadoop (http://hadoop.apache.org/core/) developed by the Apache project is an open source implementation of MapReduce.
The MapReduce Paradigm
Local Data Local Data Local Data Distributed Data
Map Map Map
Partial Result Partial Result Partial Result Intermediate Data
Reduce Reduce
Aggregated Result
The MapReduce Paradigm
A MapReduce implementation like Hadoop typically provides a distributed file system (DFS):
I Master/worker architecture (Namenode/Datanodes) I Data locality
I Map tasks are applied to partitioned data
I Map tasks scheduled so that input blocks are on same machine
I Datanodes read input at local disk speed I Data replication leads to fault tolerance
Hadoop Streaming
I Utility allowing to create and run MapReduce jobs with any executable or script as the mapper and/or the reducer $HADOOP HOME/bin/hadoop jar $HADOOP HOME/hadoop-streaming.jar
I -input inputdir I -output outputdir I -mapper ./mapper I -reducer ./reducer
Why Distributed Text Mining?
I Highly interdisciplinary research field utilizing techniques from computer science, linguistics, and statistics
I Vast amount of textual data available in machine readable format:
I scientific articles, abstracts, books, . . .
I memos, letters, . . .
I online forums, mailing lists, blogs, . . .
I Data volumes (corpora) become bigger and bigger
I Steady increase of text mining methods (both in academia as in industry) within the last decade
I Text mining methods are becoming more complex and hence computer intensive
Why Distributed Text Mining?
I High Performance Computing (HPC) servers available for a reasonable price
I Integrated frameworks for parallel/distributed computing available (e.g., Hadoop)
I Thus, parallel/distributed computing is now easier than ever I Standard software for data processing already offer extensions
Text Mining in R
I tm Package I Tailored for
I Plain texts, articles and papers
I Web documents (XML,SGML, . . . ) I Surveys I Methods for I Clustering I Classification I Visualization
Text Mining in R
I. Feinerer
tm: Text Mining Package, 2009
URLhttp://CRAN.R-project.org/package=tm
R package version 0.3-3
I. Feinerer, K. Hornik, and D. Meyer Text mining infrastructure in R
Journal of Statistical Software, 25(5):1–54, March 2008 ISSN 1548-7660
Distributed Text Mining in R
Motivation:
I Large data sets
I Corpus typically loaded into memory
I Operations on all elements of the corpus (so-called transformations)
Available transformations: stemDoc(),stripWhitespace(),
Distributed Text Mining Strategies in R
Possibilities:
I Text mining using tm and MapReduce (via Hadoop framework)
I Text mining using tm and MPI/snow1
1
Distributed Text Mining in R
Solution (Hadoop):
I Data set copied to DFS (DistributedCorpus) I Only meta information about the corpus in memory
I Computational operations (Map) on all elements in parallel I Work horse tmMap()
Distributed Text Mining in R - Listing
Mapper (called by tmMap):1 . h a d o o p_g e n e r a t e_tm_m a p p e r< - f u n c t i o n( script , FUN , . . . ) {
2 w r i t e L i n e s ( s p r i n t f (’ #! /usr/bin/env R s c r i p t 3 ## l o a d tm p a c k a g e 4 r e q u i r e (" tm ") 5 fun < - % s 6 ## r e a d f r o m s t d i n 7 i n p u t < - r e a d L i n e s ( f i l e (" s t d i n ") ) 8 ## c r e a t e o b j e c t of c l a s s ’P l a i n T e x t D o c u m e n t’ 9 doc < - new ( " P l a i n T e x t D o c u m e n t " , . D a t a = i n p u t [ -1 L ] , 10 D a t e T i m e S t a m p = Sys . t i m e () ) 11 ## a p p l y f u n c t i o n on d o c u m e n t 12 r e s u l t < - fun ( doc ) 13 ## w r i t e key 14 w r i t e L i n e s ( i n p u t [1 L ] ) 15 ## w r i t e v a l u e 16 w r i t e L i n e s ( C o n t e n t ( r e s u l t ) ) 17 ’, FUN ) , s c r i p t ) 18 }
Distributed Text Mining in R
Example: Stemming
I Erasing word suffixes to retrieve their radicals I Reduces complexity
I Stemmers provided in packages Rstem1 and Snowball2 Data:
I Wizard of Oz book series (http://www.gutenberg.org) I 20 books, each containing 1529 – 9452 lines of text
1Duncan Temple Lang (version 0.3-0 on Omegahat) 2
Distributed Text Mining in R
Workflow:
I Start Hadoop framework I Put files into DFS
I Apply map functions (e.g., stemming of document) I Retrieve output data
Distributed Text Mining in R
I Infrastructure
I Computers of PC Lab used as worker nodes
I 8 PCs with an Intel Pentium 4 CPU @ 3.2 GHz
and 1 GB of RAM
I Each PC has>20 GB reserved for DFS
I Development platform: 8-core Power 6 shared memory system
I No cluster installation (yet)
I MapReduce framework
I Hadoop
I Implements MapReduce + DFS
I Local R and tm installation
Benchmark
1 2 3 4 5 6 7 8 500 1500 2500 3500 Runtime # CPU Runtime [s] ● 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 Speedup # CPU SpeedupLessons Learned
I Problem size has to be sufficiently large
I Location of texts in DFS (currently: ID = file path) I Thus, serialization difficult (how to update text IDs?) I Remote file operation on DFS around 2.5 sec.
Text Mining in Java
Frameworks
I GATE - General Architecture for Text Engineering (Cunningham, 2002)
I UIMA - Unstructured Information Management Architecture (IBM)
I OpenNLP
I WEKA (Witten & Frank, 2005) I webLyzard
Text Mining in Java
I webLyzard Framework I Tailored for
I Web documents (HTML)
I Common Text Formats (PDF,ODT,DOC, . . . )
I Annotation Components:
I geographic tagging
I Keywords and co-occurring terms
I Named entitiy tagging
I Sentiment tagging
I Search and Indexing:
Text Mining in Python
Frameworks
I Natural Language Toolkit (NLTK) I easy Web Retrieval Toolkit (eWRT) I webLyzard
Text Mining in Python
I Annotation Components:I Keywords and co-occurring terms
I Language detection (short words, trigrams, bayes)
I Part of speech (POS) tagging
I Sentiment
I Data Sources
I Alexa web services (Web page rating)
I Amazon product reviews (sentiment)
I DBpedia (ontology data)
I Del.icio.us (social bookmarking)
I Facebook (in development)
I OpenCalais (named entity recognition)
I Scarlet (relation discovery)
I TripAdvisor (sentiment +)
I Wikipedia (disambiguation, synonyms)
Distributed Processing
Approaches
I Hadoop API
I efficient
I build in mechanism for processing arbitrary objects
I write specialized code
I Hadoop-streaming
I programming language independend
I easy to use
I text only→objects?
I eWRT transparent caching
I transparent object caching
I low overhead
Hadoop API - Mapper and Reducer
I Mapper and Reducer
I Implement theMapper, ReducerInterface
I Inherited fromMapReduceBase
I Objects
I Implement theWritableinterface
I based on theDataInput, DataOutput6=Serializable
I readFields()→initializes object
Hadoop API - Mapper and Reducer
M a p p e r +map(K1:key,V1:value,OutputCollector<K2, V2>:output,Reporter:reporter) R e d u c e r +reduce(K2:key,Iterator<V2>:values,OutputCollector<K3, V3>:output,Reporter:reporter) M a p R e d u c e B a s e M y M a p p e r M y R e d u c e r JobConf use W r i t a b l e +readFileds(DataOutput:in) +write(DataOutput:out) use { K 1 k e y : V 1 v a l u e } - > { K 2 k e y : V 2 v a l u e s } - > { K 3 k e y : V 3 v a l u e }Hadoop API - Distributed File System - Listing
1 C o n f i g u r a t i o n c o n f = new C o n f i g u r a t i o n (); 2 F i l e S y s t e m h a d o o p F s = F i l e S y s t e m . get ( c o n f ); 4 // I n p u t and O u t p u t f i l e s 5 P a t h i n F i l e = new P a t h ( a r g v [ 0 ] ) ; 6 P a t h o u t F i l e = new P a t h ( a r g v [ 1 ] ) ; 8 // C h e c k w h e t h e r a f i l e e x i s t s 9 if (! h a d o o p F s . e x i s t s ( i n F i l e )) { 10 p r i n t A n d E x i t (" I n p u t f i l e not f o u n d "); 11 }Hadoop API - Distributed File System - Access
2 F S D a t a I n p u t S t r e a m in = h a d o o p F s . o p e n ( i n F i l e ); 3 w h i l e (( b y t e s R e a d = in . r e a d ( b u f f e r )) > 0) { 4 p r o c e s s ( buffer , b y t e s R e a d ); 5 } 6 in . c l o s e ();Hadoop Streaming - How to save your Objects? (1/4)
I Hadoop streaming:
I no fixed order
I on a per text line base
I Approaches
I do not use objects
I enclose meta data into the input
I straight forward solution
→use your program languages serialization facilities
I Java: SerializableInterface I Python: cPickle
Hadoop Streaming - How to save your Objects? (2/4)
I Pitfalls:
I Binary serialization formats
I More than one line serializations (Python)
I Performance
I Suggestions:
I Transparent compression
(lightweight: .gz; compare: Google’s Bigtable implementation)
I Encode the data stream using for instance Base64
Hadoop Streaming - How to save your Objects? (3/4)
1 f r o m c P i c k l e i m p o r t d u m p s 3 c l a s s T e s t C l a s s ( o b j e c t ): 4 def _ _ i n i t _ _ ( self , a , b ): 5 s e l f . a = a 6 s e l f . b = b 8 ti = T e s t C l a s s (" Tom ", " S u s a n ") 9 p r i n t d u m p s ( ti ) ccopy_reg\nreconstructor\np1\n(c__main__\nTestClass\np2 c__builtin__\nobject\np3\nNtRp4 (dp5\nS’a’\nS’Tom’\np6\nsS’b’\nS’Susan’\np7\nsb.Hadoop Streaming - How to save your Objects? (4/4)
1 f r o m b a s e 6 4 i m p o r t e n c o d e s t r i n g 2 f r o m z l i b i m p o r t c o m p r e s s
4 s e r i a l i z e = l a m b d a obj : e n c o d e s t r i n g ( c o m p r e s s ( obj ))
Object Payload Serialized Size
String 8 Bytes 33 Bytes
Tuple/List/Set 8 Bytes 41/49/86 Bytes
TestClass 8 Bytes 150 Bytes
Reuters Article 3474 Bytes 1699 Bytes Table: Serialization Overhead
eWRT - Transparent Object Caching
I transparent
I minimal code modifications
I adaptor pattern
I disk or in memory cache I Current use:
I complex computations
I access to remote resources
eWRT - Transparent Object Caching
I original code: 1 def g e t C a l a i s T a g s ( self , d o c u m e n t _ i d ): 2 r e t u r n s e l f . c a l a i s . f e t c h _ t a g s ( d o c u m e n t _ i d ) I refined code: 1 def _ _ i n i t _ _ ( s e l f ): 2 s e l f . c a c h e = \ 3 C a c h e (" ./ c a c h e ", c a c h e _ n e s t i n g _ l e v e l =2) 5 def g e t C a l a i s T a g s ( self , d o c u m e n t _ i d ): 6 r e t u r n s e l f . c a c h e . f e t c h ( \ 7 d o c u m e n t _ i d , s e l f . c a l a i s . f e t c h _ t a g s )Conclusion
I MapReduce has proven to be a useful abstraction I Greatly simplifies distributed computing
I Developer focus on problem
I Implementations like Hadoop deal with messy details
I different approaches to facilitate Hadoop’s infrastructure
Thank You for Your Attention!
Stefan Theußl
Department of Statistics and Mathematics email: [email protected]
URL:http://statmath.wu.ac.at/~theussl
Albert Weichselbraun
Department of Information Systems and Operations email: [email protected]
URL:http://www.ai.wu.ac.at/~aweichse
WU Vienna University of Economics and Business Augasse 2–6, A-1090 Wien