• No results found

Simple Parallel Computing in R Using Hadoop

N/A
N/A
Protected

Academic year: 2021

Share "Simple Parallel Computing in R Using Hadoop"

Copied!
37
0
0

Loading.... (view fulltext now)

Full text

(1)

Simple Parallel Computing in R Using Hadoop

Stefan Theußl

WU Vienna University of Economics and Business Augasse 2–6, 1090 Vienna

[email protected] 30.06.2009

(2)

Agenda

Problem & Motivation The MapReduce Paradigm Package hive

Distributed Text Mining in R Discussion

(3)

Motivation

Recap of my talk last year:

I Computer architecture: distributed memory (DMP) and

shared memory platforms (SMP)

I Types of parallelism: functional or data-driven I Parallel computing strategies: threads (on SMP),

message-passing (on DMP), high-level abstractions (e.g., packages snow, nws)

(4)

Motivation

Main motivation: large scale data processing

I Many tasks, i.e. we produce output data via processing lots of

input data

I Want to make use of many CPUs

I Typically this is not easy (parallelization, synchronization,

I/O, debugging, etc.)

I Need for an integrated framework

(5)
(6)

The MapReduce Paradigm

I Programming model inspired by functional language

primitives

I Automatic parallelization and distribution I Fault tolerance

I I/O scheduling

I Examples: document clustering, web access log analysis,

search index construction, . . .

Jeffrey Dean and Sanjay Ghemawat.

MapReduce: Simplified data processing on large clusters.

InOSDI’04, 6th Symposium on Operating Systems Design and Implementation, pages 137–150, 2004.

Hadoop (http://hadoop.apache.org/core/) developed by

the Apache project is an open source implementation of MapReduce.

(7)

The MapReduce Paradigm

Local Data Local Data Local Data Distributed Data

Map Map Map

Partial Result Partial Result Partial Result Intermediate Data

Reduce Reduce

Aggregated Result

(8)

The MapReduce Paradigm

A MapReduce implementation like Hadoop typically provides a distributed file system (DFS):

I Master/worker architecture (Namenode/Datanodes) I Data locality

I Map tasks are applied to partitioned data

I Map tasks scheduled so that input blocks are on same

machine

I Datanodes read input at local disk speed I Data replication leads to fault tolerance

(9)

The MapReduce Paradigm

Figure: HDFS architecture

(10)

Hadoop Streaming

I Utility allowing to create and run MapReduce jobs with any

executable or script as the mapper and/or the reducer $HADOOP HOME/bin/hadoop jar $HADOOP HOME/hadoop-streaming.jar

I -input inputdir

I -output outputdir

I -mapper ./mapper

I -reducer ./reducer

Local Data Intermediate Data Processed Data R: Map

stdin() stdout()

R: Reduce

(11)
(12)

Hadoop InteractiVE (hive)

hive provides:

I Easy-to-use interface to Hadoop I Currently, only Hadoop core

(http://hadoop.apache.org/core/) supported

I High-level functions for handling Hadoop framework

(hive start(),hive create(),hive is available(), etc.)

I DFS accessor functions in R (DFS put(),DFS list(), DFS cat(), etc.)

I Streaming via Hadoop (hive stream())

(13)

hive: How to Get Started?

Prerequisites:

I Java 1.6.x

I Passwordlessssh within nodes

I Best practice: provide everything via shared filesystem (e.g.,

(14)

hive: How to Get Started?

Installation:

I Download and untar Hadoop core from

http://hadoop.apache.org/core/

I Modify JAVA HOMEenvironment variable in

/path/to/hadoop-0.20.0/conf/hadoop-env.sh appropriately

I Further (optional) configuration:

mapred-site.xml e.g., configuration of number of parallel tasks (mapred.tasktracker.map.tasks.maximum)

core-site.xml e.g., where is the DFS managed? (fs.default.name)

hdfs-site.xml e.g., number of chunk replication (dfs.replication)

I Export HADOOP HOME environment variable

(15)

Example: Word Count

Data preparation: 1 > l i b r a r y(" h i v e ") 2 L o a d i n g r e q u i r e d p a c k a g e: r J a v a 3 L o a d i n g r e q u i r e d p a c k a g e: XML 4 > h i v e_ s t a r t() 5 > h i v e_ is _a v a i l a b l e () 6 [1] T R U E 7 > DFS_put ("~ /D a t a/R e u t e r s/m i n i m a l ", "/tmp/R e u t e r s ") 8 > DFS_ l i s t("/tmp/R e u t e r s ")

9 [1] " reut - 0 0 0 0 1 . xml " " reut - 0 0 0 0 2 . xml " " reut - 0 0 0 0 3 . xml "

10 [4] " reut - 0 0 0 0 4 . xml " " reut - 0 0 0 0 5 . xml " " reut - 0 0 0 0 6 . xml "

11 [7] " reut - 0 0 0 0 7 . xml " " reut - 0 0 0 0 8 . xml " " reut - 0 0 0 0 9 . xml "

12 > h e a d ( DFS_ r e a d _ l i n e s("/tmp/R e u t e r s/reut - 0 0 0 0 2 . xml "))

13 [1] " <? xml v e r s i o n =\ "1 . 0 \" ? > "

14 [2] " < R E U T E R S T O P I C S =\ "NO \" L E W I S S P L I T =\ "T R A I N \" [ . . . ]

15 [3] " < DATE >26 - FEB - 1 9 8 7 1 5 : 0 3 : 2 7 . 5 1 </DATE >"

16 [4] " < T O P I C S/>"

17 [5] " < PLACES >"

(16)

Example: Word Count

1 m a p p e r < - f u n c t i o n(){

2 m a p r e d_ w r i t e _o u t p u t < - f u n c t i o n( key , v a l u e )

3 cat( s p r i n t f (" % s \ t % s \ n ", key , v a l u e ) , sep = " ")

5 t r i m_w h i t e_s p a c e < - f u n c t i o n( l i n e ) 6 g s u b(" (^ + ) | ( +$) ", " ", l i n e ) 7 s p l i t _i n t o_w o r d s < - f u n c t i o n( l i n e ) 8 u n l i s t(s t r s p l i t( line , " [[: s p a c e : ] ] + ")) 10 con < - f i l e(" s t d i n ", o p e n = " r ") 11 w h i l e (l e n g t h( l i n e < - r e a d L i n e s ( con , n = 1 , 12 w a r n = F A L S E )) > 0) { 13 l i n e < - t r i m_w h i t e_s p a c e ( l i n e ) 14 w o r d s < - s p l i t _i n t o_w o r d s ( l i n e ) 15 if(l e n g t h( w o r d s )) 16 m a p r e d_ w r i t e _o u t p u t ( words , 1) 17 } 18 c l o s e( con ) 19 }

(17)

Example: Word Count

1 r e d u c e r < - f u n c t i o n(){

2 [ . . . ]

3 env < - new.env( h a s h = T R U E )

4 con < - f i l e(" s t d i n ", o p e n = " r ") 5 w h i l e (l e n g t h( l i n e < - r e a d L i n e s ( con , n = 1 , 6 w a r n = F A L S E )) > 0) { 7 s p l i t < - s p l i t _l i n e ( l i n e ) 8 w o r d < - s p l i t $w o r d 9 c o u n t < - s p l i t $ c o u n t 10 if(n c h a r( w o r d ) > 0){ 11 if(e x i s t s( word , e n v i r = env, i n h e r i t s = F A L S E )) {

12 o l d c o u n t < - get( word , e n v i r = env)

13 a s s i g n ( word , o l d c o u n t + count, e n v i r = env)

14 }

15 e l s e a s s i g n ( word , count, e n v i r = env)

16 }

17 }

18 c l o s e( con )

19 for ( w in ls(env, all = T R U E ))

20 cat( w , " \ t ", get( w , e n v i r = env) , " \ n ", sep = " ")

(18)

Example: Word Count

1 > h i v e_s t r e a m ( m a p p e r = mapper , 2 r e d u c e r = reducer , 3 i n p u t = "/tmp/R e u t e r s ", 4 o u t p u t = "/tmp/R e u t e r s_out ") 5 > DFS_ l i s t("/tmp/R e u t e r s_out ") 6 [1] "_l o g s " " part - 0 0 0 0 0 " 7 > r e s u l t s < - DFS_ r e a d _ l i n e s( 8 "/tmp/R e u t e r s_out/part - 0 0 0 0 0 ") 9 > h e a d ( r e s u l t s ) 10 [1] " -\ t2 " " - -\ t7 " 11 [3] " :\ t1 " " .\ t1 " 12 [5] " 0064 </UNKNOWN >\ t1 " " 0066 </UNKNOWN >\ t1 " 13 > tmp < - s t r s p l i t( results , " \ t ") 14 > c o u n t s < - as.i n t e g e r(u n l i s t(l a p p l y( tmp , f u n c t i o n( x ) 15 x [ [ 2 ] ] ) ) ) 16 > n a m e s( c o u n t s ) < - u n l i s t(l a p p l y( tmp , f u n c t i o n( x ) 17 x [ [ 1 ] ] ) ) 18 > h e a d (s o r t( counts , d e c r e a s i n g = T R U E )) 19 the to and of at s a i d 20 58 44 41 30 25 22

(19)

hive: Summary

I Further functions: DFS put object(),DFS cat(), hive create(),hive get parameter(), . . .

I Currently, heavy usage of command line tools

I Java interface in preparation (presentation @ useR 2009) I Use infrastructure of package HadoopStreaming?

(20)
(21)

Why Distributed Text Mining?

I Highly interdisciplinary research field utilizing techniques from

computer science, linguistics, and statistics

I Vast amount of textual data available in machine readable

format:

I scientific articles, abstracts, books, . . . I memos, letters, . . .

I online forums, mailing lists, blogs, . . .

I Data volumes (corpora) become bigger and bigger

I Steady increase of text mining methods (both in academia as

in industry) within the last decade

I Text mining methods are becoming more complex and hence

computer intensive

(22)

Why Distributed Text Mining?

I High Performance Computing (HPC) servers available for a

reasonable price

I Integrated frameworks for parallel/distributed computing

available (e.g., Hadoop)

I Thus, parallel/distributed computing is now easier than ever I Standard software for data processing already offer extensions

(23)

Text Mining in R

I tm Package

I Tailored for

I Plain texts, articles and papers I Web documents (XML,SGML, . . . ) I Surveys I Methods for I Clustering I Classification I Visualization

(24)

Text Mining in R

I. Feinerer

tm: Text Mining Package, 2009

URLhttp://CRAN.R-project.org/package=tm

R package version 0.3-3

I. Feinerer, K. Hornik, and D. Meyer

Text mining infrastructure in R

Journal of Statistical Software, 25(5):1–54, March 2008

ISSN 1548-7660

(25)

Distributed Text Mining in R

Example: Stemming

I Erasing word suffixes to retrieve their radicals I Reduces complexity

I Stemmers provided in packages Rstem1 and Snowball2

Data:

I Wizard of Oz book series (http://www.gutenberg.org): 20

books, each containing 1529 – 9452 lines of text

I Reuters-21578: one of the most widely used test collection for

text categorization research

I (New York Times corpus)

1Duncan Temple Lang (version 0.3-0 on Omegahat) 2

(26)

Distributed Text Mining in R

Motivation:

I Large data sets

I Corpus typically loaded into memory

I Operations on all elements of the corpus (so-called

transformations)

Available transformations: stemDoc(),stripWhitespace(), tmTolower(), . . .

(27)

Distributed Text Mining Strategies in R

Strategies:

I Text mining using tm and Hadoop/hive1 I Text mining using tm and MPI/snow2

1Stefan Theußl (version 0.1-1) 2

(28)

Distributed Text Mining in R

Solution (Hadoop):

I Data set copied to DFS (‘DistributedCorpus’)

I Only meta information about the corpus in memory

I Computational operations (Map) on all elements in parallel)

I Work horse tmMap()

(29)

Distributed Text Mining in R - Listing

1 > l i b r a r y(" tm ") 2 L o a d i n g r e q u i r e d p a c k a g e: s l a m 3 > i n p u t < - "~ /D a t a/R e u t e r s/r e u t e r s_xml " 4 > co < - C o r p u s ( D i r S o u r c e ( i n p u t ) , [ . . . ] ) 5 > co 6 A c o r p u s w i t h 2 1 5 7 8 t e x t d o c u m e n t s 7 > p r i n t( o b j e c t . s i z e (co) , u n i t s = " Mb ") 8 6 5 . 5 Mb 10 > s o u r c e(" c o r p u s . R ") 11 > s o u r c e(" r e a d e r . R ") 12 > dc < - D i s t r i b u t e d C o r p u s ( D i r S o u r c e ( i n p u t ) , [ . . . ] ) 13 > dc 14 A c o r p u s w i t h 2 1 5 7 8 t e x t d o c u m e n t s 15 > dc [ [ 1 ] ] 16 S h o w e r s c o n t i n u e d t h r o u g h o u t the w e e k in 17 [ . . . ] 18 > p r i n t( o b j e c t . s i z e ( dc ) , u n i t s = " Mb ") 19 1.9 Mb

(30)

Distributed Text Mining in R - Listing

Mapper (called by tmMap): 1 m a p p e r < - f u n c t i o n(){ 2 r e q u i r e(" tm ") 3 fun < - s o m e_tm_m e t h o d 4 [ . . . ] 5 con < - f i l e(" s t d i n ", o p e n = " r ") 6 w h i l e(l e n g t h( l i n e < - r e a d L i n e s ( con , n = 1 L , 7 w a r n = F A L S E )) > 0) { 8 i n p u t < - s p l i t _l i n e ( l i n e ) 9 r e s u l t < - fun ( i n p u t$v a l u e ) 10 if(l e n g t h( r e s u l t )) 11 m a p r e d_ w r i t e _o u t p u t ( i n p u t$key , r e s u l t ) 12 } 13 c l o s e( con ) 14 }

(31)

Distributed Text Mining in R

Infrastructure:

I Development platform: 8-core Power 6 shared memory system

IBM System p 550

4 2-core IBM POWER6 @ 3.5 GHz

128 GB RAM

I Computers of PC Lab used as worker nodes

I 8 PCs with an Intel Pentium 4 CPU @ 3.2 GHz and 1 GB of

RAM

I Each PC has>20 GB reserved for DFS MapReduce framework:

I Hadoop (implements MapReduce + DFS)

I R (2.9.0) with tm (0.4) and hive (0.1-1) I Code implementing ‘DistributedCorpus’

(32)

Benchmark

Reuters-21578:

I Single processor runtime (lapply()): 133 min.

I tm/hive on 8-core SMP (hive stream()): 4 min.

I tm/snow on 8 nodes of cluster@WU (parLapply()): 2.13 min.

(33)

Benchmark

1 2 3 4 5 6 7 8 500 1500 2500 3500 Runtime # CPU Runtime [s] ● 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 Speedup # CPU Speedup

(34)

Distributed Text Mining in R

Excursion: How to store text files in the DFS

I Requirement: access text documents in R via [[

I Difficult to achieve: almost random output after calling map in Hadoop

I Output chunks automatically renamed topart-xxxxx.

I Solution: add meta information to each chunk (chunk name,

position in the chunk)

(35)

Lessons Learned

I Problem size has to be sufficiently large

I Location of texts in DFS (currently: ID = file path) I Thus, serialization difficult (how to update text IDs?) I Remote file operation on DFS around 2.5 sec. (will be

(36)

Conclusion

I MapReduce has proven to be a useful abstraction I Greatly simplifies distributed computing

I Developer focus on problem

I Implementations like Hadoop deal with messy details I different approaches to facilitate Hadoop’s infrastructure I language- and use case dependent

(37)

Thank You for Your Attention!

Stefan Theußl

Department of Statistics and Mathematics

email: [email protected]

URL:http://statmath.wu.ac.at/~theussl WU Vienna University of Economics and Business Augasse 2–6, A-1090 Wien

References

Related documents

This research focuses on clustering of genes with similar expres- sion patterns using Hidden Markov Models (HMM) for time course data because they are able to model the

For each individual whose compensation must be reported in Schedule J, report compensation from the organization on row ( i) and from related organizations , described in

With all of the change in health care and the unprecedented demand for physician leadership, this book is a must-have for every physician with an interest in medical

Novel therapeutic option for refractory heart failure in elderly patients with chronic kidney disease by incremental peritoneal dialysis. Koch M, Haastert B, Kohnle M

Correlation between Breast Self-Examination Practices and Demographic Characteristics, Risk Factors and Clinical Stage of Breast Cancer among Iraqi Patients..

Greetings Potential 2015 Ultimate Chicago Sandblast Event Sponsorship Partner, From creating brand awareness and increasing sales, Ultimate Chicago Sandblast is the ideal marketing

In the following section, information on the VDR variants found in the populations of South Africa was collated, with the aim of determining the function of such polymorphisms

Even at this early stage, Duke was getting noticed for his different style of music and by 1930, Duke and his band were famous..