• No results found

Distributed Computing with Hadoop

N/A
N/A
Protected

Academic year: 2021

Share "Distributed Computing with Hadoop"

Copied!
42
0
0

Loading.... (view fulltext now)

Full text

(1)

Distributed Computing with Hadoop

Stefan Theußl, Albert Weichselbraun

WU Vienna University of Economics and Business Augasse 2–6, 1090 Vienna

[email protected] [email protected]

(2)

Agenda

Problem & Motivation The MapReduce Paradigm Distributed Text Mining in R Distributed Text Mining in Java Distributed Text Mining in Python Implementation Details

Hadoop API Hadoop Streaming eWRT

(3)

Motivation

Main motivation: large scale data processing

I Many tasks, i.e. we produce output data via processing lots of input data

I Want to make use of many CPUs

I Typically this is not easy (parallelization, synchronization, I/O, debugging, etc.)

(4)
(5)

The MapReduce Paradigm

I Programming model inspired by functional language primitives

I Automatic parallelization and distribution I Fault tolerance

I I/O scheduling

I Examples: document clustering, web access log analysis, search index construction, . . .

Jeffrey Dean and Sanjay Ghemawat.

MapReduce: Simplified data processing on large clusters.

InOSDI’04, 6th Symposium on Operating Systems Design and Implementation, pages 137–150, 2004.

Hadoop (http://hadoop.apache.org/core/) developed by the Apache project is an open source implementation of MapReduce.

(6)

The MapReduce Paradigm

Local Data Local Data Local Data Distributed Data

Map Map Map

Partial Result Partial Result Partial Result Intermediate Data

Reduce Reduce

Aggregated Result

(7)

The MapReduce Paradigm

A MapReduce implementation like Hadoop typically provides a distributed file system (DFS):

I Master/worker architecture (Namenode/Datanodes) I Data locality

I Map tasks are applied to partitioned data

I Map tasks scheduled so that input blocks are on same machine

I Datanodes read input at local disk speed I Data replication leads to fault tolerance

(8)

Hadoop Streaming

I Utility allowing to create and run MapReduce jobs with any executable or script as the mapper and/or the reducer $HADOOP HOME/bin/hadoop jar $HADOOP HOME/hadoop-streaming.jar

I -input inputdir I -output outputdir I -mapper ./mapper I -reducer ./reducer

(9)
(10)

Why Distributed Text Mining?

I Highly interdisciplinary research field utilizing techniques from computer science, linguistics, and statistics

I Vast amount of textual data available in machine readable format:

I scientific articles, abstracts, books, . . .

I memos, letters, . . .

I online forums, mailing lists, blogs, . . .

I Data volumes (corpora) become bigger and bigger

I Steady increase of text mining methods (both in academia as in industry) within the last decade

I Text mining methods are becoming more complex and hence computer intensive

(11)

Why Distributed Text Mining?

I High Performance Computing (HPC) servers available for a reasonable price

I Integrated frameworks for parallel/distributed computing available (e.g., Hadoop)

I Thus, parallel/distributed computing is now easier than ever I Standard software for data processing already offer extensions

(12)

Text Mining in R

I tm Package I Tailored for

I Plain texts, articles and papers

I Web documents (XML,SGML, . . . ) I Surveys I Methods for I Clustering I Classification I Visualization

(13)

Text Mining in R

I. Feinerer

tm: Text Mining Package, 2009

URLhttp://CRAN.R-project.org/package=tm

R package version 0.3-3

I. Feinerer, K. Hornik, and D. Meyer Text mining infrastructure in R

Journal of Statistical Software, 25(5):1–54, March 2008 ISSN 1548-7660

(14)

Distributed Text Mining in R

Motivation:

I Large data sets

I Corpus typically loaded into memory

I Operations on all elements of the corpus (so-called transformations)

Available transformations: stemDoc(),stripWhitespace(),

(15)

Distributed Text Mining Strategies in R

Possibilities:

I Text mining using tm and MapReduce (via Hadoop framework)

I Text mining using tm and MPI/snow1

1

(16)

Distributed Text Mining in R

Solution (Hadoop):

I Data set copied to DFS (DistributedCorpus) I Only meta information about the corpus in memory

I Computational operations (Map) on all elements in parallel I Work horse tmMap()

(17)
(18)

Distributed Text Mining in R - Listing

Mapper (called by tmMap):

1 . h a d o o p_g e n e r a t e_tm_m a p p e r< - f u n c t i o n( script , FUN , . . . ) {

2 w r i t e L i n e s ( s p r i n t f (’ #! /usr/bin/env R s c r i p t 3 ## l o a d tm p a c k a g e 4 r e q u i r e (" tm ") 5 fun < - % s 6 ## r e a d f r o m s t d i n 7 i n p u t < - r e a d L i n e s ( f i l e (" s t d i n ") ) 8 ## c r e a t e o b j e c t of c l a s s ’P l a i n T e x t D o c u m e n t’ 9 doc < - new ( " P l a i n T e x t D o c u m e n t " , . D a t a = i n p u t [ -1 L ] , 10 D a t e T i m e S t a m p = Sys . t i m e () ) 11 ## a p p l y f u n c t i o n on d o c u m e n t 12 r e s u l t < - fun ( doc ) 13 ## w r i t e key 14 w r i t e L i n e s ( i n p u t [1 L ] ) 15 ## w r i t e v a l u e 16 w r i t e L i n e s ( C o n t e n t ( r e s u l t ) ) 17 ’, FUN ) , s c r i p t ) 18 }

(19)

Distributed Text Mining in R

Example: Stemming

I Erasing word suffixes to retrieve their radicals I Reduces complexity

I Stemmers provided in packages Rstem1 and Snowball2 Data:

I Wizard of Oz book series (http://www.gutenberg.org) I 20 books, each containing 1529 – 9452 lines of text

1Duncan Temple Lang (version 0.3-0 on Omegahat) 2

(20)

Distributed Text Mining in R

Workflow:

I Start Hadoop framework I Put files into DFS

I Apply map functions (e.g., stemming of document) I Retrieve output data

(21)

Distributed Text Mining in R

I Infrastructure

I Computers of PC Lab used as worker nodes

I 8 PCs with an Intel Pentium 4 CPU @ 3.2 GHz

and 1 GB of RAM

I Each PC has>20 GB reserved for DFS

I Development platform: 8-core Power 6 shared memory system

I No cluster installation (yet)

I MapReduce framework

I Hadoop

I Implements MapReduce + DFS

I Local R and tm installation

(22)

Benchmark

1 2 3 4 5 6 7 8 500 1500 2500 3500 Runtime # CPU Runtime [s] ● 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 Speedup # CPU Speedup

(23)

Lessons Learned

I Problem size has to be sufficiently large

I Location of texts in DFS (currently: ID = file path) I Thus, serialization difficult (how to update text IDs?) I Remote file operation on DFS around 2.5 sec.

(24)
(25)

Text Mining in Java

Frameworks

I GATE - General Architecture for Text Engineering (Cunningham, 2002)

I UIMA - Unstructured Information Management Architecture (IBM)

I OpenNLP

I WEKA (Witten & Frank, 2005) I webLyzard

(26)

Text Mining in Java

I webLyzard Framework I Tailored for

I Web documents (HTML)

I Common Text Formats (PDF,ODT,DOC, . . . )

I Annotation Components:

I geographic tagging

I Keywords and co-occurring terms

I Named entitiy tagging

I Sentiment tagging

I Search and Indexing:

(27)

Text Mining in Python

Frameworks

I Natural Language Toolkit (NLTK) I easy Web Retrieval Toolkit (eWRT) I webLyzard

(28)

Text Mining in Python

I Annotation Components:

I Keywords and co-occurring terms

I Language detection (short words, trigrams, bayes)

I Part of speech (POS) tagging

I Sentiment

I Data Sources

I Alexa web services (Web page rating)

I Amazon product reviews (sentiment)

I DBpedia (ontology data)

I Del.icio.us (social bookmarking)

I Facebook (in development)

I OpenCalais (named entity recognition)

I Scarlet (relation discovery)

I TripAdvisor (sentiment +)

I Wikipedia (disambiguation, synonyms)

(29)
(30)

Distributed Processing

Approaches

I Hadoop API

I efficient

I build in mechanism for processing arbitrary objects

I write specialized code

I Hadoop-streaming

I programming language independend

I easy to use

I text only→objects?

I eWRT transparent caching

I transparent object caching

I low overhead

(31)

Hadoop API - Mapper and Reducer

I Mapper and Reducer

I Implement theMapper, ReducerInterface

I Inherited fromMapReduceBase

I Objects

I Implement theWritableinterface

I based on theDataInput, DataOutput6=Serializable

I readFields()→initializes object

(32)

Hadoop API - Mapper and Reducer

M a p p e r +map(K1:key,V1:value,OutputCollector<K2, V2>:output,Reporter:reporter) R e d u c e r +reduce(K2:key,Iterator<V2>:values,OutputCollector<K3, V3>:output,Reporter:reporter) M a p R e d u c e B a s e M y M a p p e r M y R e d u c e r JobConf use W r i t a b l e +readFileds(DataOutput:in) +write(DataOutput:out) use { K 1 k e y : V 1 v a l u e } - > { K 2 k e y : V 2 v a l u e s } - > { K 3 k e y : V 3 v a l u e }

(33)

Hadoop API - Distributed File System - Listing

1 C o n f i g u r a t i o n c o n f = new C o n f i g u r a t i o n (); 2 F i l e S y s t e m h a d o o p F s = F i l e S y s t e m . get ( c o n f ); 4 // I n p u t and O u t p u t f i l e s 5 P a t h i n F i l e = new P a t h ( a r g v [ 0 ] ) ; 6 P a t h o u t F i l e = new P a t h ( a r g v [ 1 ] ) ; 8 // C h e c k w h e t h e r a f i l e e x i s t s 9 if (! h a d o o p F s . e x i s t s ( i n F i l e )) { 10 p r i n t A n d E x i t (" I n p u t f i l e not f o u n d "); 11 }

(34)

Hadoop API - Distributed File System - Access

2 F S D a t a I n p u t S t r e a m in = h a d o o p F s . o p e n ( i n F i l e ); 3 w h i l e (( b y t e s R e a d = in . r e a d ( b u f f e r )) > 0) { 4 p r o c e s s ( buffer , b y t e s R e a d ); 5 } 6 in . c l o s e ();

(35)

Hadoop Streaming - How to save your Objects? (1/4)

I Hadoop streaming:

I no fixed order

I on a per text line base

I Approaches

I do not use objects

I enclose meta data into the input

I straight forward solution

→use your program languages serialization facilities

I Java: SerializableInterface I Python: cPickle

(36)

Hadoop Streaming - How to save your Objects? (2/4)

I Pitfalls:

I Binary serialization formats

I More than one line serializations (Python)

I Performance

I Suggestions:

I Transparent compression

(lightweight: .gz; compare: Google’s Bigtable implementation)

I Encode the data stream using for instance Base64

(37)

Hadoop Streaming - How to save your Objects? (3/4)

1 f r o m c P i c k l e i m p o r t d u m p s 3 c l a s s T e s t C l a s s ( o b j e c t ): 4 def _ _ i n i t _ _ ( self , a , b ): 5 s e l f . a = a 6 s e l f . b = b 8 ti = T e s t C l a s s (" Tom ", " S u s a n ") 9 p r i n t d u m p s ( ti ) ccopy_reg\nreconstructor\np1\n(c__main__\nTestClass\np2 c__builtin__\nobject\np3\nNtRp4 (dp5\nS’a’\nS’Tom’\np6\nsS’b’\nS’Susan’\np7\nsb.

(38)

Hadoop Streaming - How to save your Objects? (4/4)

1 f r o m b a s e 6 4 i m p o r t e n c o d e s t r i n g 2 f r o m z l i b i m p o r t c o m p r e s s

4 s e r i a l i z e = l a m b d a obj : e n c o d e s t r i n g ( c o m p r e s s ( obj ))

Object Payload Serialized Size

String 8 Bytes 33 Bytes

Tuple/List/Set 8 Bytes 41/49/86 Bytes

TestClass 8 Bytes 150 Bytes

Reuters Article 3474 Bytes 1699 Bytes Table: Serialization Overhead

(39)

eWRT - Transparent Object Caching

I transparent

I minimal code modifications

I adaptor pattern

I disk or in memory cache I Current use:

I complex computations

I access to remote resources

(40)

eWRT - Transparent Object Caching

I original code: 1 def g e t C a l a i s T a g s ( self , d o c u m e n t _ i d ): 2 r e t u r n s e l f . c a l a i s . f e t c h _ t a g s ( d o c u m e n t _ i d ) I refined code: 1 def _ _ i n i t _ _ ( s e l f ): 2 s e l f . c a c h e = \ 3 C a c h e (" ./ c a c h e ", c a c h e _ n e s t i n g _ l e v e l =2) 5 def g e t C a l a i s T a g s ( self , d o c u m e n t _ i d ): 6 r e t u r n s e l f . c a c h e . f e t c h ( \ 7 d o c u m e n t _ i d , s e l f . c a l a i s . f e t c h _ t a g s )

(41)

Conclusion

I MapReduce has proven to be a useful abstraction I Greatly simplifies distributed computing

I Developer focus on problem

I Implementations like Hadoop deal with messy details

I different approaches to facilitate Hadoop’s infrastructure

(42)

Thank You for Your Attention!

Stefan Theußl

Department of Statistics and Mathematics email: [email protected]

URL:http://statmath.wu.ac.at/~theussl

Albert Weichselbraun

Department of Information Systems and Operations email: [email protected]

URL:http://www.ai.wu.ac.at/~aweichse

WU Vienna University of Economics and Business Augasse 2–6, A-1090 Wien

References

Related documents