• No results found

Data Mining for Big Data: Tools and Approaches. Pace SDSC SAN DIEGO SUPERCOMPUTER CENTER

N/A
N/A
Protected

Academic year: 2021

Share "Data Mining for Big Data: Tools and Approaches. Pace SDSC SAN DIEGO SUPERCOMPUTER CENTER"

Copied!
79
0
0

Loading.... (view fulltext now)

Full text

(1)

Data Mining for Big Data:

Tools and Approaches

Pace

SDSC

(2)

Todo

R domc exercise?

Test train account

Paradigm stream eg fro mbook? And mapred or

(3)

Outline

Scaling

What is Big Data

Parallel option for R

Map/Reduce

(4)

Scaling, practically

Scaling (with or without more data):

• more processing/searching (e.g. training more complicated neural networks)

• more complex analysis (larger ensemble)

• more sampling (more trees in Random Forest)

Sometimes easy to parallelize (like with

sampling),

Sometimes too much communication between

(5)

Scaling In a nutshell

R takes advantage of math libraries for vector

operations

R packages provide multicore, multinode (snow),

or map/reduce (RHadoop) options

However, model implementations not

necessarily built to use parallel backends

(6)

R vector operations and scale

Intel Math Kernel Libraries provides fast

operations for sums and multiplication

(7)

Consider Regression Computations

Linear Model: 𝑌 = 𝑋 ∗ 𝐵

where Y=outcomes , X=data matrix

Algebraically, we could:

• take “inverse” 𝑜𝑓 𝑋 ∗ 𝑌 = 𝐵 (time consuming)

Or, better:

• decompose X into triangular matrices (more memory) then solve more easily

(8)

Consider Regression models

Related Models and Functions :

lm() #Linear Model

glm() #Generalized Linear Model

(logistic regression, etc) aov() #Analysis of Variance

( returns ANOVA table of F-scores)

(9)

Wall Time (secs) GLM: logistic Solve(a,b) QR 30min 1K 2K 4K 8K GLM: Gaussian LM() inverse R:

glm(Y~X,family=gaussian) #gaussn regrssn (like lm) glm(Y~X,family=binomial) # logistic regrssn (Y=0 or 1)

Solving Linear Systems

Performance with R

(10)

R multicore

Run loop iterations on separate cores

install.packages(doMC) library(doMC)

registerDoMC(cores=15) getDoParWorkers()

results = foreach(i=1:15,.combine=rbind) %dopar% { … your code here

return( a variable or object ) })

allocate workers

returned items ‘combined’ into list by default

%dopar% puts loops across cores,

(loops are independent)

%do% runs it serially

specify to combine results into array with row bind

(11)

R multicore exercise

Can be run on Gordon compute node or on

laptop

First: putty (windows) or ssh (mac terminal) Gordon

(12)

Enter userid password, (you get signed into login node) train91 to train110 $ls is listing

(13)

From compute node (here its called gcn-6-71) $ cd BootCamp

$ cd Rtests

$ module load R $ R

(14)

Source(‘Ex1_RdoMC.R’) exercise script,

(15)

Ex1 script tests dopar with and without combine How are return values combined?

(16)

Source(‘Ex2_MC.R’)

The scripts builds and multiplies two matrixes.

Enter number of cores 1 to 16

Enter block size: 100,1000,2000 (for eg) You should see processing

time for different doMC steps:

1 parallel with %dopar% 2serially with just %do% 3 just native R

(17)

Multicore to multinodes

INTEL SANDY BRIDGE COMPUTE NODE

Sockets & Cores 2 & 16

Clock speed 2.6 GHz

DRAM capacity and speed 64 GB, 1,333 MHz

INTEL710 eMLC FLASHI/O NODE

NAND flash SSD drives 16

SSD capacity per drive & per

node 16 * 300 GB = 4.8 TB SMP SUPER-NODE (VIA VSMP)

Compute nodes / I/O Nodes 32 / 2

Addressable DRAM 2 TB Addressable memory including flash 11.6 TB GORDON(AGGREGATE) Compute Nodes 1,024 Compute cores 16,384 Peak performance 341 TF

DRAM/SSD memory 64 TB DRAM; 300 TB SSD

INFINIBANDINTERCONNECT

Architecture Dual-Rail, 3D torus

Link Bandwidth QDR

Vendor Mellanox

LUSTRE-BASEDDISKI/O SUBSYSTEM(SHARED)

(18)

Scale and Computations

Multicore and Multinode • Communication vs Distributed tradeoff • Some operations

always best when you can stay on 1 core

(19)

R multinode: parallel backend

library(doSNOW) …

cl <- makeCluster( mpi.universe.size(), type='MPI' ) clusterExport(cl,c('data'))

registerDoSNOW(cl)

results = foreach(i=1:15,.combine=rbind) %dopar% { … your code here

return( a variable or object ) })

stopCluster(cl) mpi.exit()

Run loop iterations on separate nodes

allocate cluster as parallel backend

%dopar% puts loops across cores and nodes

(20)

time N=10K 20K 30K 40K 50K Gb=2 6.5 14 25 40 8 threads 32 threads

Square Matrix size

time( s) 32 threads 16 threads N=10K 20K 30K 40K 50K Gb=2 6.5 14 25 40

Matrix Multiplication Matrix Inversion

threads across CPUs: more is better for multiplication, less is better for inversion (or use different operation)

Multiple CPUs may not help so much

(21)
(22)
(23)

4 V’s of Big Data

(24)

Uniquely Big Data Problems

Streaming data from sensors (energy grids)

• Cant store it, process/analyze as it comes

Internet Page Rank for searches

• constantly new links and pages into graph database

Data/video uploads (youtube, security cams)

• No annotations

Digital text (books, medical notes, blogs)

• Unstructured

Twitter messaging

• constantly changing topics

(25)

What to do with big data?

(ERIC SALL)

Big Data Exploration

• To get an overall understanding of what is there

360 degree view of the customer

• Combine both internally available and external information to gain a deeper understanding of the customer

Monitoring Cyber-security and fraud in real time

Operational Analysis

• Leveraging machine generated data to improve business effectiveness

Data Warehouse Augmentation

• Enhancing warehouse solution with new information models and architecture

(26)

Big Data Practically

Too big to fit on 1 computer memory

Too big to make one pass through on 1

computer

Too big for 1 hard drive

(27)

Got Big Data

Map/Reduce framework started by Google

Main idea: bring computation to data

Apache Hadoop is one implementation

Hadoop is entire ecosystem of supporting tools

• HDFS: Hadoop distributed file system (for partitioning, merging data, reliably, using binary format)

• Hive: database using map/red on HDFS • Pig : query tool using map/red on HDFS

(28)

User defined functions MR provides

parallelization,concurrency,

and intermediate data functions (sorting by key&value)

User defines keys & values

(29)

Taking Advantage of Map/Reduce

Map-to-Reduce: what is a key?

whatever you need for the sorting should be related to Σ for reducer Example:

word count: key is word

(30)

Map/Reduce Algorithm

General Conditions

• operations/data are separable and independent • data that doesn’t fit into memory

• data that doesn’t need to be all read into memory

General Strategy

If you have this: Σ (some process) then do this:

Map ‘some process’ over parts

(31)
(32)

Hadoop Map/Reduce Interfaces with R

(slides from G.Lockwood SDSC)

R Streaming (simplest) or Hadoop API

Streaming pipes input/output through steps

cat input | Rscript mapper.R | sort | Rscript reducer.R > output

You provide these two scripts; Hadoop does the rest

generalizable to any language you want (Perl,

(33)

Paradigmatic Example: Word Counting

How would you count all the words in Moby Dick?

Call me Ishmael. Some years ago never mind how long precisely -having little or no money in my purse, and nothing particular to

interest me on shore, I thought I would sail about a little and see the watery part of the world. It is a way I have of driving off the spleen and regulating the circulation. ….

How could you count all the words in all web pages? (assume the data is spread out over many nodes)

(34)

emit.keyval <- function(key, value) {

cat(key, '\t', value, '\n', sep='') }

stdin <- file('stdin', open='r')

while ( length(line <- readLines(stdin, n=1)) > 0 ) {

line <- gsub('(^\\s+|\\s+$)', '', line)

keys <- unlist(strsplit(line, split='\\s+'))

value <- 1

lapply(keys, FUN=emit.keyval, value=value) }

close(stdin)

Wordcount: Hadoop streaming mapper

Example from Glen Lockwood, SDSC

Emit key-value pairs (‘cat’ is ‘concatenate and print’)

Split line Into words Use words as keys

(35)

to the reducers

What One Mapper Does

Call me Ishmael. Some years ago—never mind how long

Call me Ishmael. Some years ago--never mind how long

Call me Ishmael. Some years mind long

1

1

1

how

1

ago--never

1

1

1

1

1

line = keys = emit.keyval(key,value) ...

(36)

Reducer Loop

If this key is the same as the previous key,

• add this key's value to our running total.

Otherwise,

• print out the previous key's name and the running total, • reset our running total to 0,

• add this key's value to the running total, and • "this key" is now considered the "previous key"

(37)

Wordcount: Streaming Reducer (1/2)

last_key <- ""

running_total <- 0

stdin <- file('stdin', open='r')

while ( length(line <- readLines(stdin,n=1)) > 0 ) {

line <- gsub('(^\\s+)|(\\s+$)', '', line)

keyvalue <- unlist(strsplit(line, split='\t', fixed=TRUE))

this_key <- keyvalue[[1]]

value <- as.numeric(keyvalue[[2]])

if ( last_key == this_key ) {

running_total <- running_total + value

} else { (to be continued...) Get key, Value Add up values

(38)

Wordcount: Streaming Reducer (2/2)

else {

if ( last_key != "" ) {

cat(

paste(last_key,'\t',running_total,'\n',sep='') ) } running_total <- value last_key <- this_key } } if ( last_key == this_key ) {

cat( paste(last_key,'\t',running_total,'\n',sep='') ) }

close(stdin)

For each new key, emit <key, sum>

(39)

Testing Mappers/Reducers

Debugging Hadoop is not fun

$ head -n100 pg2701.txt | ./wordcount-streaming-mapper.R | sort | ./wordcount-streaming-reducer.R ... with 5 word, 1 world. 1 www.gutenberg.org 1 you 3 You 1

(40)

Launching Hadoop Streaming

$ hadoop dfs -copyFromLocal ./pg2701.txt mobydick.txt $ hadoop jar \

/opt/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \

-D mapred.reduce.tasks=2 \

-mapper "Rscript $PWD/wordcount-streaming-mapper.R" \

-reducer "Rscript $PWD/wordcount-streaming-reducer.R" \

-input mobydick.txt \

-output output

$ hadoop dfs -cat output/part-* > ./output.txt

(41)
(42)

Taking Advantage of Map/Reduce

Case Study:

election related tweets

daily change in approval ratings What is the relationship between tweets and approval ratings?

(43)

Tweet Data

Twitter provides access to data

Unstructured message text and meta data

{"created_time": "13:27:15 +0000", "text": "@TeAmo_Giirls Vote Obama Man....", "user_id": 235813005, "id": 253848760784912384, "created_date": "Thu Oct 04 2012"}

{"created_time": "01:12:35 +0000", "text": "I swear these dudes in this class dont understand english. Its like my teacher is speaking some foreign

language to them", "user_id": 275370836, "id": 250764771656343552, "created_date": "Wed Sep 26 2012"}

…… etc …

(44)

Twitter and Other Data

Obama Approval minus Disapproval

poll tracking leading up to 2012 election

(45)

Defining Flow Map/Reduce

Goal : turn tweet message into data by day

Target: approval change from previous day

Choices:

• track message elements (words, …)

• track metadata ( date, users, replies,…) Let’s try word counts by date

(46)

Defining Flow Map/Reduce

Approach: extend word count into <date,word>

count

Map: split tweet into parts and emit Key < date, word > Value 1 Reduce: add up value

(47)

Defining Flow Map/Reduce

What other aggregations do we need?

At what point will data fit into memory?

1 Do we need the list of unique words and their overall counts?

2 If you want to correlate target to unexpected word counts, then what sums does that need?

(48)

My Example Flow

1. Process messages

Map: split tweet message into <date,word>,1 Reduce: sum counts for <date,word>

2. Re-map

Map: split <date,word>,1 into <date>,1 Reduce: sum counts for <date>

3. Re-map

Map: split <date,word>,1 into <word>,1

(49)

Example Flow Downstream

For analysis, perhaps, the end product is a date

X word data matrix,

Each Row is a count of words for one date (using top P words) Joined with approval rating changes (as +1 or -1 down col1) data:

APPR Vote Billion senator june

- 0 1 0 0 … + 1 3 2 0 … + 1 0 0 1 … DATE Apr 01 Apr 02 Apr 03  words 

(50)

Gordon Access

putty or ssh for windows to get Unix shell on

(51)

directory listing (ls)

$cd BootCamp $sh QSUBH.txt

Get to compute node $cd BootCamp

$cd Rhad_Tweets $ls

(52)

Some scripts for date-word counting of tweet messages.

The ‘…process.R’ file produces a data matrix

(53)
(54)
(55)
(56)
(57)

Note slave nodes, task trackers, data

trackers

(58)
(59)
(60)
(61)

end

Sample output from reduce steps

(62)

Sample output from reduce steps

Process cnts into data matrix

(63)
(64)
(65)
(66)
(67)
(68)
(69)
(70)
(71)

Map/Reduce Algorithm

General Conditions

• operations/data are separable and independent • data that doesn’t fit into memory

• data that doesn’t need to be all read into memory

General Strategy

If you can divide problem into parts then do this:

Map ‘some process’ over parts

(72)

Join Multiple Dataset on Key

Problem: 2 files in HDFS that should be

combined on key value

In pseudo SQL

Select * from table A, table B, where A.key=B.key

Joins can be inner, left or right outer

(73)

Join Map/Reduce Strategy

Problem: Join 2 key,value sets

A= <wd >|<count> about 5 actor 15 bacon 3 .. B= <wd>|<date> able Nov 16 actor Feb 01 actor May 03 bacon Apr 11 ..

Want something like AjoinB is <wd>|<list of values>

actor 15, Feb 01 actor 15, May 03 bacon 3, Apr 11

(74)

Join Map/Reduce Strategy

One solution:

stream both A and B tables to map

Intermediate step will shuffle data so that keys are together

Key Value about A,5 able B,Nov 16 actor A,15 actor B,Feb 01 actor B,May 03 ..

(75)

Join Map/Reduce Strategy

One solution:

A reducer has access to all rows from A,B with same key value, so it can split those rows back to A or B (how?) and take a cross-product Key Value about A,5 able B,Nov 16 actor A,15 actor B,Feb 01 actor B,May 03 .. A about 5 actor 15 B able Nov 16 actor Feb 01 actor May 03

(76)

Join Map/Reduce Strategy

Size matters:

If one dataset fits in memory, it can be replicated across nodes and fit in memory with Map only (replicated join) If both datasets are large, use full Map/Reduce

(repartition join)

If both datasets are large but one can be filtered down, do 1 map/reduce first (semi-join)

(77)

Summary of Map/Reduce

Design Considerations

Composite keys and/or values

Grouping

• Bundle keys into groups

Replication

• Repeating values across more than 1key

(78)

Machine Learning

Most algorithms have some summation step, so

Map/Reduce will speed up jobs

But parameter estimations require

communication between parts

• Some algorithms look at interdependencies across NxP data matrix

• E.g. Lin Reg inverts a X’*X a PxP matrix, NNets propagate errors

• Some algorithms use observations interdepencies,

• e.g. SVM kernels

• Some algorithms take distances and sums mostly

(79)

Machine Learning and Map/Reduce

Mahout for Hadoop is a java library of machine

learning algorithms

processes data in ‘chunks’ that fit in memory command line or programming interface

many advanced algorithms

Spark is new version of Map/Reduce (UCBerkeley)

main idea: maintain data in memory, don’t write out and shuffle unless need to

References

Related documents

In conclusion, for the studied Taiwanese population of diabetic patients undergoing hemodialysis, increased mortality rates are associated with higher average FPG levels at 1 and

In this study, we compared seven frequently used CNV detection methods: circular binary segmentation (CBS) (8), CNVFinder (9), cnvPartition, gain and loss of DNA (GLAD) (7),

Simulated reconstructed model output (Reservoir Simulation) compared to observed discharge from 2011 to 2013, for a) Thu Bon at Nong Son station, b) Vu Gia at Thanh My Station..

Jayanthakumaran and Frank (2004) test the hypothesis that trade reforms have had a positive impact on manufacturing exports, using both time series and cross-sectional data,

Players can create characters and participate in any adventure allowed as a part of the D&amp;D Adventurers League.. As they adventure, players track their characters’

In line with tumor metrics, gene expression analysis of EP and OT treated models showed some overlapping trends in resistance mechanisms supporting tumor response of incomplete

Bu yeni yaklaşım, topolojik olarak 2- boyutlu süreksizlik yüzeylerinin fraktal boyutu- nu hesaplamak için önerilen (Clarke, 1986) üç- gen prizma yüzey alanı yönteminin temel

The primary tool that has been used to realize this practice is first to radialize an originally meshed distribution network configuration and then to employ