Hadoop:
challenge
accepted!
Arkadiusz Osiński
ToC
-‐‑ Hadoop basics -‐‑ Gather data
-‐‑ Process your data
-‐‑ Learn from your data -‐‑ Visualize your data
BigData
BigData
-‐‑ Petabytes of (un)structured data -‐‑ 12% of data is analyzed
BigData
-‐‑ Petabytes of (un)structured data -‐‑ 12% of data is analyzed
BigData
-‐‑ Petabytes of (un)structured data -‐‑ 12% of data is analyzed
-‐‑ a lot of data is not gathered -‐‑ how to gain knowledge?
Power Big Data Data Lake Scalability Petabytes Mapreduce Commodity
HDFS
HDFS
-‐‑ Storage layer
HDFS
-‐‑ Storage layer
-‐‑ Distributed file system
HDFS
-‐‑ Storage layer
-‐‑ Distributed file system
-‐‑ Commodity hardware
HDFS
-‐‑ Storage layer
-‐‑ Distributed file system
-‐‑ Commodity hardware
-‐‑ Scalability -‐‑ JBOD
HDFS
-‐‑ Storage layer
-‐‑ Distributed file system
-‐‑ Commodity hardware
-‐‑ Scalability -‐‑ JBOD
HDFS
-‐‑ Storage layer
-‐‑ Distributed file system
-‐‑ Commodity hardware
-‐‑ Scalability -‐‑ JBOD
-‐‑ Access control -‐‑ No SPOF
YARN
YARN
-‐‑ Distributed computing layer
YARN
-‐‑ Distributed computing layer
-‐‑ Operations in place of data
YARN
-‐‑ Distributed computing layer
-‐‑ Operations in place of data
-‐‑ MapReduce…
YARN
-‐‑ Distributed computing layer
-‐‑ Operations in place of data
-‐‑ MapReduce…
-‐‑ and others applications
Let’s squize our data
to get a juice!!
Gather data
flume-twitter.sources.Twitter.type = com.cloudera.flume.source.TwitterSource flume-twitter.sources.Twitter.channels = MemChannel flume-twitter.sources.Twitter.consumerKey = (…) flume-twitter.sources.Twitter.consumerSecret = (…) flume-twitter.sources.Twitter.accessToken = (…) flume-twitter.sources.Twitter.accessTokenSecret = (…)Process your data
Process your data
-‐‑ Hadoop Streaming!
Process your data
-‐‑ Hadoop Streaming!
-‐‑ No need to write code in Java
Process your data
#!/usr/bin/python import sys import json import datetime as dt keyword='hadoop'for line in sys.stdin:
data = json.loads(line.strip())
if keyword in data['text'].lower():
dt=dt.datetime.strptime(data['created_at'], '%a %b %d %H:%M:%S +0000 %Y').strftime('%Y-%m-%d')
Process your data
#!/usr/bin/python
import sys
(counter,datekey=(0,'')
for line in sys.stdin:
line = line.strip().split("\t") if datekey != line[0]:
if datekey:
print "{0}\t{1}".format(str(datekey),str(counter)) datekey = line[0]
counter = 1 else:
counter += 1
Process your data
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files ./map.py,./reduce.py \ -mapper ./map.py \
-reducer ./reduce.py \
-input /tweets/2014/04/*/*/* \ -input /tweets/2014/05/*/*/* \ -output /tweet_keyword
Process your data
(….) 2014-04-24 864 2014-04-25 1121 2014-04-26 593 2014-04-27 649 2014-04-28 1084 2014-04-29 1575 2014-04-30 1170 2014-05-01 1164 2014-05-02 1175 2014-05-03 779 2014-05-04 471 (….)Recommendations
Which product will be desired by client? We’ve got historical users interaction
Simple Example
Let’s just do mahout -‐‑ it’s easy! > apt-get install mahout
> cat simple_example.csv 1,101 1,102 1,103 2,101 > hdfs dfs -put simple_example.csv
> mahout recommenditembased -s SIMILARITY_LOGLIKELIHOOD -b \ -Dmapred.input.dir=/mahout/input/wikilinks/simple_example.csv \ -Dmapred.output.dir=/mahout/output/wikilinks/simple_example \ -Dmapred.job.queue.name=atmosphere_prod
Simple Example
Tadadam! > hdfs dfs –text /mahout/output/wikilinks/simple_example/part-r-00000.snappy 1 [105:1.0,104:1.0] 2 [106:1.0,105:1.0] 3 [103:1.0,102:1.0] 4 [105:1.0,102:1.0] 5 [107:1.0,106:1.0]Wiki Case
We’ve got links between wikipedia articles, and want to propose new links between articles.
„Wikipedia (i/ˌwɪkɨˈpiːdiəә/ or i/ˌwɪkiˈpiːdiəә/ WIK-‐‑i-‐‑PEE-‐‑dee-‐‑əә) is a collaboratively edited,
multilingual, free Internet encyclopedia that is supported by the non-‐‑profit Wikimedia Foundation. Volunteers worldwide collaboratively write Wikipedia'ʹs 30 million articles in 287 languages, including over 4.5 million in the English Wikipedia. Anyone who can access”
Wiki Case
hlp://users.on.net/%7Ehenry/pagerank/links-‐‑simple-‐‑sorted.zip #!/usr/bin/awk -f BEGIN { OFS=",”; } { gsub(":","",$1); for (i=2;i<=NF;i++) { print $1,$i } }Wiki Case
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -Dmapreduce.job.max.split.locations=24 \ -Dmapreduce.job.queuename=hadoop_prod \ -Dmapred.output.key.comparator.class=mapred.lib.KeyFieldBasedComparator \ -Dmapred.text.key.comparator.options=-n \ -Dmapred.output.compress=false \ -files ./mahout/mapper.awk \ -mapper ./mapper.awk \ -input /mahout/input/wikilinks/links-simple-sorted.txt \ -output /mahout/output/wikilinks/fixedinput
Wiki Case
Mahout lib count’s similarity Matrix and gave recommendations for 824 articles.
What’s important, we didn’t gather any knowledge a priori and just ran algorithm’s out of box.
Wiki Case
Acadèmia_Valenciana_de_la_Llengua
FIFA Valencia
October_1 Calendar
Prehistoric_Iberia Link appears recently
Ceuta Spain City at the north coast of Africa
Roussillon Part of France by the border with Spain
Sweden J
Turís municipality in the Valencian Community
Vulgar_Latin Language article
Western_Italo-‐‑
Western_languages Language article
Tweets
Let’s find group of:
• tags
Tweets
• Our data is not random
• We’ve picked specific keywords
• We’ll do analysis in two
Tweets
{
"filter_level":"medium",
"contributors":null,
"text":"PROMOCIÓN MES DE MAYO. con ...",
"geo":null, "retweeted":false, "lang":"es", "entities":{ "urls":[ { "expanded_url":"http://www.agmuriel.com", "indices":[ 69, 91 ], "display_url":"agmuriel.com/#!-/c1gz", "url":"http://t.co/APpPjRRTXn" } ] } (…)
Tweets
#!/usr/bin/python
import json, sys for line in sys.stdin: line = line.strip()
if '"lang":"en"' in line:
tweet = json.loads(line) try:
text = tweet['text'].lower().strip() if text:
tags = tweet[” entities"][”hashtags”]
for tag in tags:
print tag[“text”]+"\t"+text
except KeyError: continue #!/usr/bin/python import sys (lastKey,text) = (None,"") for line in sys.stdin:
(key,value) = line.strip().split("\t")
if lastKey and lastKey != key: print lastKey+"\t"+text (lastKey,text) = (key,value) else:
(lastKey,text) = (key,text+" "+value)
Tweets
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -Dmapreduce.job.queuename=atmosphere_time \ -Dmapred.output.compress=false \ -Dmapreduce.job.max.split.locations=24 \ -D-Dmapred.reduce.tasks=20 \ -files ~/mahout/twitter_map.py,~/mahout/twitter_reduce.py \ -mapper ./twitter_map.py \ -reducer ./twitter_reduce.py \ -input /project/atmosphere/tweets/2014/04/*/* \ -output /project/atmosphere/tweets/output \ -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat
Tweets
mahout seq2sparse \
-i /project/atmosphere/tweets/output \
-o /project/atmosphere/tweets/vectorized -ow \
-chunk 200 -wt tfidf -s 5 -md 5 -x 90 -ng 2 -ml 50 -seq -n 2
Calculate vector representation for text
{10:0.6292275202550768,14:0.7772211575566166} {10:0.6292275202550768,14:0.7772211575566166}
{3:0.37796447439954967,14:0.37796447439954967,19:0.654653676423271,22:0.534522474858859} {17:1.0}
Tweets
I’ts time to begin clusterization Let’s find 100 clusters mahout kmeans \ -i /tweets_5/vectorized/tfidf-vectors \ -c /tweets_5/kmeans/initial-clusters \ -o /tweets_5/kmeans/output-clusters \ -cd 1.0 -k 100 -x 10 -cl –ow \ -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure
Tweets
Glance at results
BURN OPEN LEATHER
FAT SOFTWARE WALLET
WEIGHTLOSS LINUX MAN
FITNESS UBUNTU
ZUMBA OPENSUSE
Tweets
It was easy because tags are
Tweets
Bigger challenge – user clustering LINUX UBUNTU WINDOWS OS PATCH MAC HACKED MICROSOFT FREE CSRRACING WON RACEYOURFRIENDS ANDROID CSRCLASSIC
Tweets
Bigger challenge – user clustering
• Results show that dataset is strongly curved
by mobile and games
• Dataset wasn’t random – we subscribed
specific keywords
Tweets
HADOOP WORLD
run predictive machine learning algorithms on hadoop without even knowing mapreduce.: data scientists are very... h:p://t.co/gdmqm5g1ar
rt @mapr: google cloud storage connector for #hadoop: quick start guide now avail h:p://t.co/17hxtvdlir
Tweets
HADOOP WORLD
Cloudera wants to do big data in Real Time.
Visualize data
add jar hive-serdes-1.0-SNAPSHOT.jar; create table tw_data_201404
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\012’
STORED AS TEXTFILE LOCATION ‘/tweets/tw_data_201404’ AS SELECT
v_date,
LOWER(hashtags.text),
lang,
COUNT(*) AS total_count
FROM logs.tweets LATERAL VIEW EXPLODE(entities.hashtags) t1 AS hashtags WHERE v_date like '2014-04-%'
Visualize data
add jar elasticsearch-hadoop-hive-2.0.0.RC1.jar;
CREATE EXTERNAL TABLE es_export ( v_date string, tag string, lang string, total_count int, info string ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler’ TBLPROPERTIES ( 'es.resource' = 'trends/log', 'es.index.auto.create' = 'true') ;
Visualize data
INSERT overwrite TABLE es_export
SELECT distinct may.v_date,may.tag,may.lang,may.total_count,'nt'
FROM tw_data_201405 may
LEFT outer JOIN tw_data_201404 april
ON april.tag = may.tag
WHERE april.tag is null
Visualize data
Thank you!
Questions?