Collaborative Filtering
Scalable Data Analysis Algorithms
Outline
1. Retrospection
2. Stratosphere Plans
3. Comparison with Hadoop
4. Evaluation
5. Outlook
Stochastic Gradient Descent
Retrospection
3Matrix factorization: Find optimal factors
1 2 2 2 1 1 5 4 ? ? 5 2 2 ? 4 2 1 1 1 2 1 W H V 4 5 3 6 6 4 3 3 2 WxH 1 2 2 2 1 1 2 1 1 1 2 1 W H
Retrospection
4Hadoop jobs with judging loss Result of SGD step
for each starting point:
while stop file not empty:
optimize factors join triples
calculate loss (training) and decide calculate loss (judging)
Stratosphere Plans
Multiple Iterations
5for each starting point: while loss too high:
optimize factors
join triples
calculate loss (training) calculate loss (judging)
MapReduce MapReduce MapReduce CrossMatch MapReduceCross MapMapReduce MapMatchReduce
Stratosphere Plans
Subplans in One Iteration
6 0.9; 1.0; 2.4 stop? 0.96 Optimize factors + Calc loss (judging) + 3,7,b,w,h Triples Calc loss (training) + Losses file not empty Training data 3,7,b,-,- Judging data Saw,Tom,4 Join triples +Stratosphere Plans
Optimize Factors
7 Map: SGD Map: filter w Map: filter h Reduce: average Reduce: average 3,-,-,w,- -,7,-,-,h 5,-,-,w,- -,7,-,-,h Map: SGD Reduce: average pacts configured to read fields not materialized Triples Factors Triples 3,7,b,w,h 5,7,b,w,h 3,-,-,w,- 5,-,-,w,- Factor W -,7,-,-,h Factor Hneeds matrix dimensions
Stratosphere Plans
Join Triples (1/2)
8 Cross Match (row,col) Map: replicate factors, keep training dataReduce: group Factors Training data Triples 3,7,b,w,h Triples 3,7,-,w,h -,7,-,-,h Factor H Training data 3,7,b,-,- 3,-,-,w,- Factor W
Stratosphere Plans
Join Triples (2/2)
9 Match (row) Match (col) 3,7,b,w,h Triples 3,7,b,w,- -,7,-,-,h Factor H Training data 3,7,b,-,- 3,-,-,w,- Factor W Cross Match (row,col) 3,7,b,w,h Triples 3,7,-,w,h -,7,-,-,h Factor H Training data 3,7,b,-,- 3,-,-,w,- Factor W compiler hints helpful?Stratosphere Plans
Calculate Loss (Training)
10Losses (epoch e)
stop?
0.9; 1.0; 2.4 Losses (epoch e+1) Map: local loss Cross: loss history Reduce:
RMSE OutputFormat decide stop
# 1 1 1 1 Map: local loss Reduce: RMSE
driver class knows loss history and decides on stopping
dummy, loss, #points
Triples
Triples
1.0; 2.4 0.9
no MapReduce job,
driver class receives loss directly
similar to training loss, Map included in Match
Stratosphere Plans
Calculate Loss (Judging)
11 Map: emit cells Match (mID, uID): local loss Reduce: RMSE 0.96 Saw,Tom,4.8 0.96 Netflix judging files dummy,0.8²,1 Factors 3,7,b,w,h Triples Judging data Saw,Tom,4Comparison
Jobs vs. Plans
12 Equal results without random...
Starting points
Training sequence
Files
Factors not materialized
Separate factor files possible
Create new file for each iteration
No efficient serialization
between plans (yet)
Either parsing text file
Or use sequence file single-threaded
Comparison
Data Schema
13 3#7 / TripleStorage
(Storage class is tagged union)
Dummy key / LossStorage
Getter, setter, toString()
3,7,b,w,h
Dummy key, loss, #points Remember key places
Reuse pacts with configurations for different keys
Comparison
Preprocessing
14Requires:
Line format according to parameters
Copy to HDFS
Serialize factors and blocks
Use Map file to write serialized values to HDFS
Java process to define lines Shell script to move to HDFS
Stratosphere Preprocessing
Define Line Format
15 Create triples Calc loss: training Netflix files blocks.txt factorW.txt Reduce: group cells to blocks factorH.txt Netflix judging files 1 2 0 0 SGD Step Plan
Suggestions
16 Global aggregation of loss with dummy key:
Reducer with no key
Reducer with compiler hint nrOfKeys = 1
No sorting needed
Provide toString() in PactRecord
Provide getter, setter in PactRecord to encapsulate field numbers and class types
Keep configuration options (e.g. reduce: average W or H)
Sequence files for sinks and sources Move log files to master node
for each starting point:
while stop file not empty:
optimize factors join triples
calculate loss (training) and decide calculate loss (judging)
step size: CrossMatch MapReduceCross MapMapReduce MapMatchReduce factor size: 5
Evaluation
Parameters for Netflix Data
17 1/2 500K 250K 125K 63K 18K 9K ~4K ~2K 1 1/4 1/8 user movies starting points: 1 max. iterations: 1 degree of parallelism: 1, 2, 5, 10 block size: 1000 movies x 1000 user data size0 h 00 min 0 h 28 min 0 h 57 min 1 h 26 min 1 h 55 min 2 h 24 min 2 h 52 min 1/8 1/4 1/2 1 ru n t im e Init Blocks Init Join SDG Step
Evaluation
Run Time for Variable Data Size
18 0 h 00 min 0 h 28 min 0 h 57 min 1 h 26 min 1 h 55 min 2 h 24 min 2 h 52 min 1/8 1/4 1/2 1 ru n t im e data size Init SGD Step 0 h 00 min 0 h 28 min 0 h 57 min 1 h 26 min 1 h 55 min 2 h 24 min 2 h 52 min 1/8 1/4 1/2 1 ru n time data size 1000 x 1000 50 x 50 1000 x 1000 grows by factor 4 (reduce cells) (cross, match) grows by factor 4 10 nodes vs. DoP = 8 Usually 10 - 30 iterations No sequence file between plans
0 min 5 min 10 min 15 min 20 min optimize
factors triples join triples write (training) calc loss (judging) calc loss
run t im e subplan 1 2 5 10
Evaluation
Run Time for Variable Degree of Parallelism
19 Data size: 1/8 DoP: Map Map Reduce CrossMatch Reduce Map
Cross
Map Match Reduce
performs best, but data is small faster with higher DoP Reading and writing always DoP=1
Outlook
20 Join triples with Cross-Match vs. Match-Match
Degree of parallelism: 10
Inspect judging outcome: RMSE should be equal Evaluate DoP with bigger data
Summary
21References
Anand Rajaraman and Jeff Ullman. Mining of Massive Datasets. Cambridge University Press, 2010.
Rainer Gemulla, Peter J. Haas, Erik Nijkamp, and Yannis Sismanis. Large-Scale
Matrix Factorization with Distributed Stochastic Gradient Descent. IBM Research
Report RJ10481, March 2011.
[1]November 2011