Data Mining for Big Data:
Tools and Approaches
Pace
SDSC
• Todo
• R domc exercise?
• Test train account
• Paradigm stream eg fro mbook? And mapred or
Outline
• Scaling
• What is Big Data
• Parallel option for R
• Map/Reduce
Scaling, practically
• Scaling (with or without more data):
• more processing/searching (e.g. training more complicated neural networks)
• more complex analysis (larger ensemble)
• more sampling (more trees in Random Forest)
• Sometimes easy to parallelize (like with
sampling),
• Sometimes too much communication between
Scaling In a nutshell
• R takes advantage of math libraries for vector
operations
• R packages provide multicore, multinode (snow),
or map/reduce (RHadoop) options
• However, model implementations not
necessarily built to use parallel backends
R vector operations and scale
• Intel Math Kernel Libraries provides fast
operations for sums and multiplication
Consider Regression Computations
• Linear Model: 𝑌 = 𝑋 ∗ 𝐵
where Y=outcomes , X=data matrix
• Algebraically, we could:
• take “inverse” 𝑜𝑓 𝑋 ∗ 𝑌 = 𝐵 (time consuming)
• Or, better:
• decompose X into triangular matrices (more memory) then solve more easily
Consider Regression models
• Related Models and Functions :
lm() #Linear Model
glm() #Generalized Linear Model
(logistic regression, etc) aov() #Analysis of Variance
( returns ANOVA table of F-scores)
Wall Time (secs) GLM: logistic Solve(a,b) QR 30min 1K 2K 4K 8K GLM: Gaussian LM() inverse R:
glm(Y~X,family=gaussian) #gaussn regrssn (like lm) glm(Y~X,family=binomial) # logistic regrssn (Y=0 or 1)
Solving Linear Systems
Performance with R
R multicore
• Run loop iterations on separate cores
install.packages(doMC) library(doMC)
registerDoMC(cores=15) getDoParWorkers()
results = foreach(i=1:15,.combine=rbind) %dopar% { … your code here
return( a variable or object ) })
allocate workers
returned items ‘combined’ into list by default
%dopar% puts loops across cores,
(loops are independent)
%do% runs it serially
specify to combine results into array with row bind
R multicore exercise
• Can be run on Gordon compute node or on
laptop
First: putty (windows) or ssh (mac terminal) Gordon
Enter userid password, (you get signed into login node) train91 to train110 $ls is listing
From compute node (here its called gcn-6-71) $ cd BootCamp
$ cd Rtests
$ module load R $ R
Source(‘Ex1_RdoMC.R’) exercise script,
Ex1 script tests dopar with and without combine How are return values combined?
Source(‘Ex2_MC.R’)
The scripts builds and multiplies two matrixes.
Enter number of cores 1 to 16
Enter block size: 100,1000,2000 (for eg) You should see processing
time for different doMC steps:
1 parallel with %dopar% 2serially with just %do% 3 just native R
Multicore to multinodes
INTEL SANDY BRIDGE COMPUTE NODE
Sockets & Cores 2 & 16
Clock speed 2.6 GHz
DRAM capacity and speed 64 GB, 1,333 MHz
INTEL710 eMLC FLASHI/O NODE
NAND flash SSD drives 16
SSD capacity per drive & per
node 16 * 300 GB = 4.8 TB SMP SUPER-NODE (VIA VSMP)
Compute nodes / I/O Nodes 32 / 2
Addressable DRAM 2 TB Addressable memory including flash 11.6 TB GORDON(AGGREGATE) Compute Nodes 1,024 Compute cores 16,384 Peak performance 341 TF
DRAM/SSD memory 64 TB DRAM; 300 TB SSD
INFINIBANDINTERCONNECT
Architecture Dual-Rail, 3D torus
Link Bandwidth QDR
Vendor Mellanox
LUSTRE-BASEDDISKI/O SUBSYSTEM(SHARED)
Scale and Computations
• Multicore and Multinode • Communication vs Distributed tradeoff • Some operationsalways best when you can stay on 1 core
R multinode: parallel backend
library(doSNOW) …
cl <- makeCluster( mpi.universe.size(), type='MPI' ) clusterExport(cl,c('data'))
registerDoSNOW(cl)
results = foreach(i=1:15,.combine=rbind) %dopar% { … your code here
return( a variable or object ) })
stopCluster(cl) mpi.exit()
• Run loop iterations on separate nodes
allocate cluster as parallel backend
%dopar% puts loops across cores and nodes
time N=10K 20K 30K 40K 50K Gb=2 6.5 14 25 40 8 threads 32 threads
Square Matrix size
time( s) 32 threads 16 threads N=10K 20K 30K 40K 50K Gb=2 6.5 14 25 40
Matrix Multiplication Matrix Inversion
threads across CPUs: more is better for multiplication, less is better for inversion (or use different operation)
Multiple CPUs may not help so much
4 V’s of Big Data
Uniquely Big Data Problems
• Streaming data from sensors (energy grids)
• Cant store it, process/analyze as it comes
• Internet Page Rank for searches
• constantly new links and pages into graph database
• Data/video uploads (youtube, security cams)
• No annotations
• Digital text (books, medical notes, blogs)
• Unstructured
• Twitter messaging
• constantly changing topics
What to do with big data?
(ERIC SALL)
• Big Data Exploration
• To get an overall understanding of what is there
• 360 degree view of the customer
• Combine both internally available and external information to gain a deeper understanding of the customer
• Monitoring Cyber-security and fraud in real time
• Operational Analysis
• Leveraging machine generated data to improve business effectiveness
• Data Warehouse Augmentation
• Enhancing warehouse solution with new information models and architecture
Big Data Practically
• Too big to fit on 1 computer memory
• Too big to make one pass through on 1
computer
• Too big for 1 hard drive
Got Big Data
• Map/Reduce framework started by Google
• Main idea: bring computation to data
• Apache Hadoop is one implementation
• Hadoop is entire ecosystem of supporting tools
• HDFS: Hadoop distributed file system (for partitioning, merging data, reliably, using binary format)
• Hive: database using map/red on HDFS • Pig : query tool using map/red on HDFS
User defined functions MR provides
parallelization,concurrency,
and intermediate data functions (sorting by key&value)
User defines keys & values
Taking Advantage of Map/Reduce
• Map-to-Reduce: what is a key?
whatever you need for the sorting should be related to Σ for reducer Example:
word count: key is word
Map/Reduce Algorithm
• General Conditions
• operations/data are separable and independent • data that doesn’t fit into memory
• data that doesn’t need to be all read into memory
• General Strategy
If you have this: Σ (some process) then do this:
Map ‘some process’ over parts
Hadoop Map/Reduce Interfaces with R
(slides from G.Lockwood SDSC)
• R Streaming (simplest) or Hadoop API
• Streaming pipes input/output through steps
cat input | Rscript mapper.R | sort | Rscript reducer.R > output
You provide these two scripts; Hadoop does the rest
• generalizable to any language you want (Perl,
Paradigmatic Example: Word Counting
How would you count all the words in Moby Dick?
Call me Ishmael. Some years ago never mind how long precisely -having little or no money in my purse, and nothing particular to
interest me on shore, I thought I would sail about a little and see the watery part of the world. It is a way I have of driving off the spleen and regulating the circulation. ….
How could you count all the words in all web pages? (assume the data is spread out over many nodes)
emit.keyval <- function(key, value) {
cat(key, '\t', value, '\n', sep='') }
stdin <- file('stdin', open='r')
while ( length(line <- readLines(stdin, n=1)) > 0 ) {
line <- gsub('(^\\s+|\\s+$)', '', line)
keys <- unlist(strsplit(line, split='\\s+'))
value <- 1
lapply(keys, FUN=emit.keyval, value=value) }
close(stdin)
Wordcount: Hadoop streaming mapper
Example from Glen Lockwood, SDSC
Emit key-value pairs (‘cat’ is ‘concatenate and print’)
Split line Into words Use words as keys
to the reducers
What One Mapper Does
Call me Ishmael. Some years ago—never mind how long
Call me Ishmael. Some years ago--never mind how long
Call me Ishmael. Some years mind long
1
1
1
how1
ago--never1
1
1
1
1
line = keys = emit.keyval(key,value) ...Reducer Loop
• If this key is the same as the previous key,
• add this key's value to our running total.
• Otherwise,
• print out the previous key's name and the running total, • reset our running total to 0,
• add this key's value to the running total, and • "this key" is now considered the "previous key"
Wordcount: Streaming Reducer (1/2)
last_key <- ""
running_total <- 0
stdin <- file('stdin', open='r')
while ( length(line <- readLines(stdin,n=1)) > 0 ) {
line <- gsub('(^\\s+)|(\\s+$)', '', line)
keyvalue <- unlist(strsplit(line, split='\t', fixed=TRUE))
this_key <- keyvalue[[1]]
value <- as.numeric(keyvalue[[2]])
if ( last_key == this_key ) {
running_total <- running_total + value
} else { (to be continued...) Get key, Value Add up values
Wordcount: Streaming Reducer (2/2)
else {
if ( last_key != "" ) {
cat(
paste(last_key,'\t',running_total,'\n',sep='') ) } running_total <- value last_key <- this_key } } if ( last_key == this_key ) {
cat( paste(last_key,'\t',running_total,'\n',sep='') ) }
close(stdin)
For each new key, emit <key, sum>
Testing Mappers/Reducers
• Debugging Hadoop is not fun
$ head -n100 pg2701.txt | ./wordcount-streaming-mapper.R | sort | ./wordcount-streaming-reducer.R ... with 5 word, 1 world. 1 www.gutenberg.org 1 you 3 You 1
Launching Hadoop Streaming
$ hadoop dfs -copyFromLocal ./pg2701.txt mobydick.txt $ hadoop jar \
/opt/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-D mapred.reduce.tasks=2 \
-mapper "Rscript $PWD/wordcount-streaming-mapper.R" \
-reducer "Rscript $PWD/wordcount-streaming-reducer.R" \
-input mobydick.txt \
-output output
$ hadoop dfs -cat output/part-* > ./output.txt
Taking Advantage of Map/Reduce
• Case Study:
election related tweets
daily change in approval ratings What is the relationship between tweets and approval ratings?
Tweet Data
• Twitter provides access to data
• Unstructured message text and meta data
• {"created_time": "13:27:15 +0000", "text": "@TeAmo_Giirls Vote Obama Man....", "user_id": 235813005, "id": 253848760784912384, "created_date": "Thu Oct 04 2012"}
• {"created_time": "01:12:35 +0000", "text": "I swear these dudes in this class dont understand english. Its like my teacher is speaking some foreign
language to them", "user_id": 275370836, "id": 250764771656343552, "created_date": "Wed Sep 26 2012"}
• …… etc …
Twitter and Other Data
• Obama Approval minus Disapproval
poll tracking leading up to 2012 election
Defining Flow Map/Reduce
• Goal : turn tweet message into data by day
• Target: approval change from previous day
• Choices:
• track message elements (words, …)
• track metadata ( date, users, replies,…) Let’s try word counts by date
Defining Flow Map/Reduce
• Approach: extend word count into <date,word>
count
Map: split tweet into parts and emit Key < date, word > Value 1 Reduce: add up value
Defining Flow Map/Reduce
• What other aggregations do we need?
• At what point will data fit into memory?
1 Do we need the list of unique words and their overall counts?
2 If you want to correlate target to unexpected word counts, then what sums does that need?
My Example Flow
1. Process messages
Map: split tweet message into <date,word>,1 Reduce: sum counts for <date,word>
2. Re-map
Map: split <date,word>,1 into <date>,1 Reduce: sum counts for <date>
3. Re-map
Map: split <date,word>,1 into <word>,1
Example Flow Downstream
• For analysis, perhaps, the end product is a date
X word data matrix,
Each Row is a count of words for one date (using top P words) Joined with approval rating changes (as +1 or -1 down col1) data:
APPR Vote Billion senator june …
- 0 1 0 0 … + 1 3 2 0 … + 1 0 0 1 … DATE Apr 01 Apr 02 Apr 03 words
Gordon Access
putty or ssh for windows to get Unix shell on
directory listing (ls)
$cd BootCamp $sh QSUBH.txt
Get to compute node $cd BootCamp
$cd Rhad_Tweets $ls
• Some scripts for date-word counting of tweet messages.
• The ‘…process.R’ file produces a data matrix
Note slave nodes, task trackers, data
trackers
end
Sample output from reduce steps
Sample output from reduce steps
Process cnts into data matrix
Map/Reduce Algorithm
• General Conditions
• operations/data are separable and independent • data that doesn’t fit into memory
• data that doesn’t need to be all read into memory
• General Strategy
If you can divide problem into parts then do this:
Map ‘some process’ over parts
Join Multiple Dataset on Key
• Problem: 2 files in HDFS that should be
combined on key value
• In pseudo SQL
Select * from table A, table B, where A.key=B.key
• Joins can be inner, left or right outer
Join Map/Reduce Strategy
• Problem: Join 2 key,value sets
A= <wd >|<count> about 5 actor 15 bacon 3 .. B= <wd>|<date> able Nov 16 actor Feb 01 actor May 03 bacon Apr 11 ..
Want something like AjoinB is <wd>|<list of values>
actor 15, Feb 01 actor 15, May 03 bacon 3, Apr 11
Join Map/Reduce Strategy
• One solution:
stream both A and B tables to map
Intermediate step will shuffle data so that keys are together
Key Value about A,5 able B,Nov 16 actor A,15 actor B,Feb 01 actor B,May 03 ..
Join Map/Reduce Strategy
• One solution:
A reducer has access to all rows from A,B with same key value, so it can split those rows back to A or B (how?) and take a cross-product Key Value about A,5 able B,Nov 16 actor A,15 actor B,Feb 01 actor B,May 03 .. A about 5 actor 15 B able Nov 16 actor Feb 01 actor May 03
Join Map/Reduce Strategy
• Size matters:
If one dataset fits in memory, it can be replicated across nodes and fit in memory with Map only (replicated join) If both datasets are large, use full Map/Reduce
(repartition join)
If both datasets are large but one can be filtered down, do 1 map/reduce first (semi-join)
Summary of Map/Reduce
Design Considerations
• Composite keys and/or values
• Grouping
• Bundle keys into groups
• Replication
• Repeating values across more than 1key
Machine Learning
• Most algorithms have some summation step, so
Map/Reduce will speed up jobs
• But parameter estimations require
communication between parts
• Some algorithms look at interdependencies across NxP data matrix
• E.g. Lin Reg inverts a X’*X a PxP matrix, NNets propagate errors
• Some algorithms use observations interdepencies,
• e.g. SVM kernels
• Some algorithms take distances and sums mostly
Machine Learning and Map/Reduce
• Mahout for Hadoop is a java library of machine
learning algorithms
processes data in ‘chunks’ that fit in memory command line or programming interface
many advanced algorithms
Spark is new version of Map/Reduce (UCBerkeley)
main idea: maintain data in memory, don’t write out and shuffle unless need to