Data Intensive Computing CSE 486/586
Project Report
BIG-DATA CONTENT RETRIEVAL, STORAGE AND ANALYSIS
FOUNDATIONS OF DATA-INTENSIVE COMPUTING
Masters in Computer Science
University at Buffalo
Website:
http://www.acsu.buffalo.edu/~mjalimin/
Submitted By
Mahesh Jaliminche (mjalimin@buffalo.edu) Mohan Upadhyay (mohanupa@buffalo.edu)
Table of Contents
Abstract
Project Objectives
Project Approach
Installing Hadoop 2.2.0 on your own machine
Data Collection: Use of twitter Streaming API
Data-Intensive Analysis of Twitter Data using Map Reduce
Finding most trending words, hash Tags, @xyz
Implementing word-co-occurrence algorithm, both “pairs” and
“stripes” approach.
Clustering: using Map Reduce version of K-means.
Applying the MR version of shortest path algorithm to label the
edges of the network/graph.
Web enabled visualization of project
1.
Abstract
This project focus on understanding the Hadoop architecture and implementation of Map Reduce on this architecture. The objective of this project is to aggregate data from twitter using twitter API and apply Map Reduce to find most trending words, Hash Tags, find most co-occurring Hash Tags. To understand and implement the word-co-occurrence algorithm, both “pairs” and “stripes”. We will use Map Reduce version of K-means to cluster the tweeters by number of followers they have which is further used to marketing and targeting ads/message. Finally we learn and implement MR version of shortest path algorithm to label the edges of the network/graph. We also analyze the discovered knowledge using the R language.
During our implementation, we plan to use different metrics like most trending words, hash tags, @xyz ,co-occurring hashtags, relative frequency of co-occurring hashtags,
2.
Project Objectives
At the end of this project, the following objectives will be covered.
To understand the components and core technologies related to content retrieval, storage and
data-intensive computing analysis.
To understand twitter API to collect streaming tweets.
To explore designing and implementing data-intensive (Big-data) computing solutions using Map
Reduce (MR) programming model.
Setup Hadoop 2 for HDFS and MR infrastructure.
Thorough analysis of data using R.
Extract meaningful results.
Come to a conclusion about the dataset in hand.
3.
Project Approach
Twitter API: The Streaming API is the real-time sample of the Twitter Firehose. This API is for those developers with data intensive needs. Streaming API allows for large quantities of keywords to be specified and tracked, retrieving geo-tagged tweets from a certain region, or have the public statuses of a user set returned. We used twitter API to get tweet text, date, username, follower count, retweet count. We also used filter query to collect topic specific tweet.
Data aggregator: we collect the data from twitter using Streaming API. Once the data is received, we clean the unwanted details from the tweets and save them in a set of regular files in a designated directory.
Data: Once the data collected we put this data into Hadoop’s /input directory so that map reduce program can read the data from this folder.
Data-Intensive Analysis(MR):
o Setup Hadoop 2 for HDFS and Map Reduce infrastructure.
o Designing and implementing the various MR workflows to extract various information from the
data. (I) simple word count (ii) trends (iii) #tag counts (iv)@xyz counts etc.
o Implementing word-co-occurrence algorithm, both “pairs” and “stripes” approach.
o Clustering: using Map Reduce version of K-means discussed in class.
o Applying the MR version of shortest path algorithm to label the edges of the network/graph.
Discovery: From the MR implementation we discover different knowledge about most trending words, hash tags, most co-occurring hash tags. The output files are converted into csv and visualization on this data is done in R.
Visualization: We analyze the discovered knowledge in R. We plot various graph to find the most trending words, hash tags. We analyze the data on daily/ weekly basis to find the current trend. Also uploads the result on website (http://www.acsu.buffalo.edu/~mjalimin/)
4.
Installing Hadoop 2.2.0 on your own machine
Installing guide URL:
https://drive.google.com/file/d/0BweVwq32koypbjd6T19QWmhUZlU/edit?usp=sharing
We followed the installing guide to install Hadoop on our machine
Virtual Machine configuration:
o Ubuntu 12.04.4 x64
o 2 CPUs, 2GB RAM
o 12GB Virtual Hard Disk
We learned various component of Hadoop such as:
o Map Reduce
o HDFS
o Yarn
Fig: Hadoop Infrastructure
From this setup we understood the underlying infrastructure of Hadoop and its power to solve Big-Data problem.
5.
Data Collection: Use of Twitter Streaming API.
We used twitter Streaming API to collect tweets. We used the tweetCollector project given by TA and modified it to extract topic specific tweets. We collected tweet text, date, user name, follower count and retweet count. We used Filter query provided by twitter API to collect topic specific tweet. Before storing the data into text file we cleaned the unwanted data. In cleaning process we removed stop words, punctuation, special characters.
Workflow:
Source: Twitter
Data Format: we collected tweet text, date, retweet count, username, follower count day wise and week wise.
Our data format is TweetText,Date,RetweetCount,UserName,FollowerCount
Eg:
RT @SirJadeja: Srinivasan Meyiappan goes CSK starts lose Bring them back guys #CskvsKXIP #KXIPvsCSK , Fri_Apr_18_21:31:30_EDT_2014 , 0 , Dabbu_k_papa_ji , 60 ;
6.
Data-Intensive Analysis of Twitter Data using Map Reduce
a.
Finding most trending words, hash Tags, @xyz
Once the data is collected we run Map Reduce program of word count on the collected data to find most trending words, hashtags, @xyz.
We implemented a custom partition which partition the data and send it to a particular reducer.
Algorithm: CLASS DRIVER MAIN METHOD 1) Run Mapper 2) Set NO_OF_REDUCERS 3) Run Practitioner 4) Run Reducer CLASS MAPPER
METHOD MAP (Key k, Line l) 1) Emit (token, 1) CLASS PARTITIONER METHOD GET_PARTITION
1) If KEY is HASH_TAG Assign to Reducer1 Else if KEY is USER_NAME Assign to Reducer2 Else
Assign to Reducer0 CLASS REDUCER
METHOD REDUCE (Word w, count <c1, c2....>) 1) SUM <- 0
2) For count c in <c1, c2....> SUM = SUM + c
EMIT (Word w, SUM)
Visualization:
The output file of map reduce program is converted into csv so that it can be read into R. The command for converting the output into csv: :%s/\s\+/,/g
R code to find top 10 trending words on 20 April
myfile<-read.csv("C:\\Users\\Mahesh Jaliminche\\Desktop\\wordCount\\20apr0.csv") Flight_query= sqldf("select * from myfile order by Count desc LIMIT 10")
Flight_query Word Count Maxwell 27152 IPL7 17119 your 16982 APP 15926 Catch 15827 Vote 15479 backing 14660 choice 14515 chance 14320
From this data we can see that the most trending words on 20 April is about IPL and election in India
Word cloud for most trending 100 words
Word cloud for most trending 100 hash tags
Top 10 trending Hashtags:
b.
Implementing Word-Co-occurrence Algorithm (Pairs and Stripes)
In this problem we find the co-occurring hash tags in a single tweet. There are 2 approaches to find co-occurrence hash tags, Pairs and Stripes approach. We also found the relative frequency of these co-occurring hashtags.
Co-occurrence Stripe count Algorithm: CLASS DRIVER METHOD MAIN 1) Run Mapper 2) Run Reducer CLASS MAPPER
METHOD MAP(docid a, doc d) 1) for all term w in doc d do H <- new AssociativeArray
2) for all term u in Neighbors(w) do H{u} <- H{u} + 1
3) Emit(Term w, Stripe H) CLASS REDUCER
MRTHOD REDUCE(term w, stripes [H1 , H2 , H3 , . . .]) 1) Hf <- new AssociativeArray
2) for all stripe H in stripes [H1 , H2 , H3 , . . .] do Sum(Hf , H)
for all terms x in stripe Hf
3) Emit(term w_x, x.value )
Co-occurrence Pair count Algorithm: CLASS DRIVER METHOD MAIN 1) Run Mapper 2) Set NO_OF_REDUCERS 3) Run Practitioner 4) Run Reducer CLASS MAPPER
METHOD MAP(docid a, doc d) 1) for all term w in doc d do
for all term u in Neighbors(w) do Emit(pair (w, u), count 1)
CLASS PARTITIONER METHOD GET_PARTITION
1) If KEY starts with a-e, A-E Assign to Reducer0
Else if KEY starts with f-j, F-J Assign to Reducer1
Else if Key starts with k-p, K-P Assign to Reducer2
Else if Key starts with q-z, Q-Z Assign to Reducer3
Else
Assign to Reducer 4
CLASS REDUCER
METHOD REDUCE(pair p, counts [c1 , c2 , . . .]) 1) s <- 0
2) for all count c in counts [c1 , c2 , . . .] do s <- s+c
3) Emit(pair p, count s)
Co-occurrence Stripe Relative Frequency Algorithm: CLASS DRIVER METHOD MAIN 1) Run Mapper 2) Run Reducer CLASS MAPPER
METHOD MAP(docid a, doc d) 1) for all term w in doc d do H <- new AssociativeArray
2) for all term u in Neighbors(w) do H{u} <- H{u} + 1
3) Emit(Term w, Stripe H) CLASS REDUCER
MRTHOD REDUCE(term w, stripes [H1 , H2 , H3 , . . .])
1) Hf <- new AssociativeArray
2) Int count<-0;
2) for all stripe H in stripes [H1 , H2 , H3 , . . .] do {
count=count+H.value }
for all terms x in stripe H
3) Emit(term w_x, x.value/count )
Co-occurrence Pair Relative Frequency Algorithm: CLASS DRIVER METHOD MAIN 1) Run Mapper1 2) Run Reducer1 3) Run Mapper2 4) Set NO_OF_REDUCERS 5) Run Practitioner 6) Run Reducer2 CLASS MAPPER1
METHOD MAP(docid a, doc d) 1) for all term w in doc d do for all term u in Neighbors(w) do Emit(pair (w, *), count 1)
CLASS MAPPER2
METHOD MAP(docid a, doc d) 1) for all term w in doc d do for all term u in Neighbors(w) do Emit(pair (w, u), count 1)
CLASS PARTITIONER METHOD GET_PARTITION
1) If KEY starts with a-e, A-E Assign to Reducer0
Else if KEY starts with f-j, F-J Assign to Reducer1
Else if Key starts with k-p, K-P Assign to Reducer2
Else if Key starts with q-z, Q-Z Assign to Reducer3
Else
Assign to Reducer 4
METHOD REDUCE(pair p, counts [c1 , c2 , . . .]) 1) s <- 0
2) for all count c in counts [c1 , c2 , . . .] do s <- s+c
3) Emit(pair p, count s) CLASS REDUCER2
METHOD REDUCE(pair p, counts [c1 , c2 , . . .]) 1) s <- 0
2) Map H <-output of 1’st reducer
2) for all count c in counts [c1 , c2 , . . .] do s <- s+c
3) s<- s/H.get(p[0]) 3) Emit(pair p, count s)
Visualization
We categorized the relative frequency into 5 category and find the number of co-occurring hash lying in those category.
group RF.Length RF.Minimum RF.Mean RF.Maximum 1 (0,0.2] 18872 0.0000156 0.06133144 0.2 2 (0.2,0.4] 3907 0.2006369 0.29454073 0.4 3 (0.4,0.6] 2105 0.4047619 0.49865937 0.6 4 (0.6,0.8] 101 0.6153846 0.69296716 0.8 5 (0.8,1] 1559 0.8076923 0.99750799 1.0
Pie chart representation of category:
c.
Clustering: using Map Reduce version of K-means
The algorithm checks new centroids with centroids generated during previous iterations. Counters are used to track whether the previous centroids are same as the new centroids. If the centroids are the same then convergence is reached and the loop is terminated. The mapper takes user id and follower count as input and finds the centroid nearest to the user. The output of mapper method is centroid id and pairs of user id and follower count. The framework aggregates all values emitted for same centroid id. The reducer takes in centroid id and aggregated pairs of user id and followers, calculates the new centroid and updates the counters.
Algorithm:
CLASS DRIVER
1) Declare enum CONVERGE {COUNTER, ITERATION} Declare INPUT_FILE_PATH and OUTPUT_FILE_PATH 2) While COUNTER > 0 do
Run MAPPER
Run REDUCER
Fetch COUNTER value
Update INPUT_FILE_PATH and OUTPUT_FILE_PATH CLASS MAPPER
METHOD MAP (UserId id , FollowerCount followers) 1) Fetch updated CENTROIDS from file
2) For centroid in CENTROIDS do
Calculate DISTANCE_CENTROID <- absolute(centroid-followers) 3) Get CENTROID_ID <- min(DISTANCE_CENTROID)
4) EMIT (CENTROID_ID,[UserId id, FollowerCount followers]) CLASS REDUCER
CONSTRUCTOR REDUCER
1) Fetch centroids from previous iterations from file Array PREVIOUS_CENTROIDS <- Fetch from file
METHOD REDUCER (CentroidId id, <[UserId id1, FolloweCount followers1],[UserId id2, FolloweCount followers2]...>)
1) SUM <- 0 COUNT <- 0
NEW_CENTROID <- 0
2) For records R in < [UserId id1, FolloweCount followers1],[UserId id2, FolloweCount followers2]...> SUM <- SUM + R.Followers
COUNT <- COUNT +1
3) NEW_CENTROID <- SUM/COUNT 4) Append NEW_CENTROID in File
5) ITERATION <- VALUE_OF (CONVERGE.ITERATIONS) If mod (ITERATION,3)==0 Then
If PREVIOUS_CENTROIDS [ITERATION%3] != NEW_CENTROID CONVERGE.COUNTER <- CONVERGE.COUNTER + 1
6) Emit (UserId id, followers + NEW_CENTROID)
d.
Applying the MR version of shortest path algorithm to label the edges of the
network/graph
Explanation: The algorithm uses counters to check for convergence. The mapper takes node id and adjacency list as input and updates the distance of the node from adjacent nodes. The mapper emits nodes from adjacency list and updated distance. The framework aggregates all values for a particular node id. The reducer takes node id and aggregated list of distance for that node and updates the adjacency list for that node. Counters are updated and are checked in the driver class for convergence.
Algorithm :
CLASS DRIVER
1) Declare enum CONVERGE {COUNTER}
Declare InputFilePath and OutputFilePath
2) While COUNTER > 0 Do
Run MAPPER
Run REDUCER
Update InputFilePath and OutputFilePath
CLASS MAPPER
FUNCTION MAP (NodeId n, Node N )
1) Read DISTANCE and ADJANCENCY_LIST for the node n
DISTANCE <- N.Distance
ADJACENCY_LIST <- N.AdjacencyList
2) Emit (NodeId n, N)
3) for nodes m in ADJANCENCY LIST Emit (NodeId m, DISTANCE + 1)
CLASS REDUCER Declare FIRST_ITERATION CONSTRUCTOR REDUCER Initialize FIRST_ITERATION=TRUE METHOD REDUCE(NodeId m, [d1,d2...]) 1) Initialize dmin<-10000 2) M <- null
3) for all d in counts [d1,d2....] do 4) if IsNode(d) then
5) M <- d
DISTANCE<-d.DISTANCE 6) Else if d < dmin then 7) dmin <- d
8) M.Distance <- dmin 10) Emit (nid m; node M)
11) If FIRST_ITERATION == TRUE then
Set COUNTER <- 0
Set FIRST_ITERATION <- FALSE
12) If DISTANCE !=dmin
Set COUNTER <- COUNTER+1
7.
Web enabled Visualization of Project
We have designed a website and uploaded all the result on the website.