Introduction to Big Data,
mostly from
www.cs.kent.edu/~jin/BigData
by
Ruoming Jin
What’s Big Data?
No single definition; here is from Wikipedia:
•
Big data is the term for a collection of data sets so large and
complex that it becomes difficult to process using on-hand
database management tools or traditional data processing
applications.
•
The challenges include
capture, curation, storage, search,
sharing, transfer, analysis, and visualization
.
•
The trend to larger data sets is due to the additional
information derivable from analysis of a single large set of
related data, as compared to separate smaller sets with the
same total amount of data, allowing correlations to be found
to "
spot business trends, determine quality of research,
prevent diseases, link legal citations, combat crime, and
determine real-time roadway traffic conditions
.”
2
Big Data: 3V’s
3 12+ TBs of tweet data every day 25+ TBs of log data every day ? TB s of d at a ev er y d ay 2+ billion people on the Web by end 2011 30 billionRFID tags today (1.3B in 2005) 4.6 billion camera phones world wide 100s of millions of GPS enabled devices sold annually76 millionsmart meters in 2009… 200M by 2014
Maximilien Brice, © CERN CERN’s Large Hydron Collider (LHC) generates 15 PB a year
Variety (Complexity)
• Relational Data (Tables/Transaction/Legacy Data)
• Text Data (Web)
• Semi-structured Data (XML)
• Graph Data
– Social Network, Semantic Web (RDF), … • Streaming Data
– You can only scan the data once
• A single application can be generating/collecting many types of data
• Big Public Data (online, weather, finance, etc)
6 To extract knowledgeall these types of
Velocity (Speed)
•
Data is begin generated fast and need to be
processed fast
•
Online Data Analytics
•
Late decisions
missing opportunities
•
Examples
–E-Promotions: Based on your current location, your purchase history, what you like send promotions right now for store next to you
–Healthcare monitoring: sensors monitoring your activities and body
any abnormal measurements require immediate reaction
7
Real-time/Fast Data
Social media and networks
(all of us are generating data) Scientific instruments(collecting all sorts of data)
Mobile devices
(tracking all objects all the time)
Sensor technology and networks
(measuring all kinds of data)
• The progress and innovation is no longer hindered by the ability to collect data
• But, by the ability to manage, analyze, summarize, visualize, and discover knowledge from the collected data in a timely manner and in a scalable fashion
8
Real-Time Analytics/Decision Requirement
Customer
Influence Behavior
Product Recommendations
that are Relevant & Compelling Friend Invitations to join a Game or Activity that expands business Preventing Fraud as it is Occurring & preventing more proactively
Learning why Customers Switch to competitors
and their offers; in time to Counter Improving the Marketing Effectiveness of a Promotion while it is still in Play
Harnessing Big Data
• OLTP: Online Transaction Processing (DBMSs)
• OLAP: Online Analytical Processing (Data Warehousing)
• RTAP: Real-Time Analytics Processing (Big Data Architecture & technology)
10
The Model Has Changed…
•
The Model of Generating/Consuming Data has Changed
Old Model: Few companies are generating data, all others are consuming data
New Model: all of us are generating data, and all of us are consuming data
Big Data:
Batch Processing &
Distributed Data Store
Hadoop/Spark; HBase/Cassandra BI Reporting
OLAP & Data warehouse Business Objects, SAS, Informatica, Cognos other SQL
Reporting Tools
Interactive Business
Intelligence &
In-memory RDBMS
QliqView, Tableau, HANA
Big Data:
Real Time &
Single View
Graph Databases
THE EVOLUTION OF BUSINESS INTELLIGENCE
1990’s
2000’s
2010’s
SpeedScale
Scale
Big Data Analytics
• Big data is more real-time in nature than traditional DW applications
• Traditional DW architectures (e.g. Exadata, Teradata) are not well-suited for big data apps
• Shared nothing, massively parallel processing, scale out architectures are well-suited for big data apps
13
Big Data Technology
15
Cloud Computing
•
IT resources provided as a service
–
Compute, storage, databases, queues
•
Clouds leverage economies of scale of
commodity hardware
–
Cheap storage, high bandwidth networks &
multicore processors
–
Geographically distributed data centers
•
Offerings from Microsoft, Amazon, Google, …
Topic 2: Hadoop/MapReduce
Programming & Data Processing
• Architecture of Hadoop, HDFS, and Yarn
• Programming on Hadoop
• Basic Data Processing: Sort and Join
• Information Retrieval using Hadoop
• Data Mining using Hadoop (Kmeans+Histograms)
• Machine Learning on Hadoop (EM)
• Hive/Pig
• Spark vs. MapReduce on HDFS
• HBase and Cassandra
References
•
References:
• Hadoop: The Definitive Guide, Tom White, O’Reilly
• Hadoop In Action, Chuck Lam, Manning
• Doing Data Science, Rachel Schutt and Cathy O’Neil, O’Reilly
• Data-Intensive Text Processing with MapReduce, Jimmy Lin and Chris Dyer ( www.umiacs.umd.edu/~jimmylin/MapReduce-book-final.pdf)
•
Good tutorial presentation & examples at:
•
http://research.google.com/pubs/pub36249.html
• The definitive original paper:
http://research.google.com/archive/mapreduce.html
Cloud Resources
•
Hadoop on your local machine
•
Hadoop in a virtual machine on your local
machine (Pseudo-Distributed on Ubuntu)
•
Hadoop in the clouds with Amazon EC2
Introduction to
MapReduce/Hadoop
From Ruoming Jin’s Slides,
themselves adapted from Jimmy Lin’s
slides (at UMD)
©2011 Cloudera, Inc. All Rights Reserved. 2
Storage Only Grid (original raw data)
Instrumentation Collection RDBMS (aggregated data) BI Reports + Interactive Apps
Mostly Append ETL Compute Grid
Moving Data To Compute Doesn’t Scale
Can’t Explore Original High Fidelity Raw Data
Archiving = Premature Data Death
Limitations of Existing Data Analytics Architecture
Slides from Dr. Amr Awadallah’s Hadoop talk at Stanford, CTO & VPE from Cloudera
Key Ideas
•
Scale “out”, not “up”
–
Limits of SMP and large shared-memory machines
•
Move processing to the data
–
Cluster may have limited bandwidth
•
Process data sequentially, avoid random access
–
Seeks are expensive, disk throughput is reasonable
•
Seamless scalability
–
From the mythical man-month to the tradable
machine-hour
The datacenter is the computer!
Apache Hadoop
• Scalable fault-tolerant distributed system for Big Data:
–Data Storage
–Data Processing
–A virtual Big Data machine
–Borrowed concepts/Ideas from Google; Open source under the Apache license
• Core Hadoop has two main systems:
–Hadoop/MapReduce: distributed big data processing infrastructure
(abstract/paradigm, fault-tolerant, schedule, execution)
–HDFS (Hadoop Distributed File System): fault-tolerant,
high-bandwidth, high availability distributed storage
• More recently (since 2014): Apache Spark on Hadoop/HDFS and ...
–Apache Spark is now the most active open source project in big data with more than 600 contributors within the past year.
MapReduce: Big Data Processing Abstraction
Example: word counts
Millions of documents in Word counts out:
brown, 2 fox, 2 how, 1 now, 1 the, 3 …
In practice, before MapReduce and related technologies: The first 10 computers are easy;
The first 100 computers are hard; The first 1000 computers are impossible;
But now with MapReduce, engineers at Google often use 10000 computers!
What’s wrong with 1000 computers?
Some will crash while you’re working…
If probability of crash = .001
Then probability of all up = (1-.001)
1000= 0.37
MapReduce expects crashes, tracks partial work, keeps going
Typical Large-Data Problem
•
Iterate over a large number of records
•
Extract something of interest from each
•
Shuffle and sort intermediate results
•
Aggregate intermediate results
•
Generate final output
Key idea: provide a functional abstraction for these two
operations
(Dean and Ghemawat, OSDI 2004)
MapReduce
•
Programmers specify two functions:
map
(k, v) → [(k’, v’)]
reduce
(k’, [v’]) → [(k’, v’’)] or simpler
–
All values with the same key (k’) are sent to the
same reducer, in k’ order for each reducer
–
Here [] means a sequence
•
The execution framework handles everything
else…
•
Spark: has map, reduce as operations, plus
others.
“Hello World”: Word Count
Map(String docid, String text):
for each word w in text: Emit(w, 1);
Reduce(String term, Iterator<Int> values):
int sum = 0; for each v in values:
sum += v; Emit(term, sum);
MapReduce “Runtime”
•
Handles scheduling
–
Assigns workers to map and reduce tasks
•
Handles “data distribution”
–
Moves processes to data
•
Handles synchronization
–
Gathers, sorts, and shuffles intermediate data
•
Handles errors and faults
–
Detects worker failures and restarts
•
Everything happens on top of a distributed FS (later)
•
This description also valid for Spark, but it uses
memory more, so can run faster in many cases.
MapReduce
•
Programmers specify two functions:
map
(k, v) → [(k’, v’)]
reduce
(k’, [v’]) → [(k’, v’’)]
–
All values with the same key are reduced together
•
The execution framework handles everything else…
•
Not quite…usually, programmers also specify:
partition
(k’, number of partitions) → partition for k’
–
Often a simple hash of the key, e.g., hash(k’) mod n
–
Divides up key space for parallel reduce operations
•and eventual delivery of results to certain partitions
combine
(k’, [v’]) → [(k’, v’’)]
–
Mini-reducers that run in memory after the map phase
–
Used as an optimization to reduce network traffic
combine
combine combine combine
b
a1 2 c9 a5 c 2 b7 c8
partition partition partition partition
map
map map map
k1v1 k2v2 k3v3 k4v4 k5v5 k6v6
b
a1 2 c 3 c 6 a5 c 2 b7 c8
Shuffle and Sort: aggregate values by keys
reduce reduce reduce
a 1 5 b 2 7 c 2 9 8
r1s1 r2s2 r3s3
c 2 3 6 8
Word Count Execution
the quick brown fox
the fox ate the mouse how now brown cow Map Map Map Reduce Reduce brown, 2 fox, 2 how, 1 now, 1 the, 3 ate, 1 cow, 1 mouse, 1 quick, 1 the, 1 brown, 1 fox, 1 quick, 1 the, 1 fox, 1 the, 1 how, 1 now, 1 brown, 1 ate, 1 mouse, 1 cow, 1
Input Map Shuffle & Sort Reduce Output
brown: 1,1 fox: 1,1 how:1 now:1 the:1,1,1 ate: 1 cow: 1 mouse: 1 quick: 1
MapReduce Implementations
•
Google has a proprietary implementation in C++
–
Bindings in Java, Python
•
Hadoop is an open-source implementation in Java
–
Development led by Yahoo, used in production
–
Now an Apache project
–
Rapidly expanding software ecosystem
•
Lots of custom research implementations
–
For GPUs, cell processors, etc.
Hadoop History
• Dec 2004
–
Google GFS paper published• July 2005
–
Nutch uses MapReduce• Apr 2007
–
Yahoo! on 1000-node cluster• Jan 2008
–
An Apache Top Level Project• Jul 2008
–
A 4000 node test cluster• Sept 2008 – Hive becomes a Hadoop subproject
• Feb 2009 – The Yahoo! Search Webmap is a Hadoop application that runs
on more than 10,000 core Linux cluster and produces data that is now used in every Yahoo! Web search query.
• June 2009 – On June 10, 2009, Yahoo! made available the source code to
the version of Hadoop it runs in production.
• In 2010 Facebook claimed that they have the largest Hadoop cluster in the
world with 21 PB of storage. On July 27, 2011 they announced the data has grown to 30 PB.
• Feb 2014: Apache™ Spark™ starts as Top-Level Project. Originally
developed (2011+) at the University of California, Berkeley'sAMPLab, the Spark codebase was at this point donated to Apache (open source).
Who uses Hadoop?
•
Amazon/A9
•
•
•
IBM
•
Joost
•
Last.fm
•
New York Times
•
PowerSet
•
Veoh
•
Yahoo!
Example Word Count (Map)
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word,one); } } }
Example Word Count (Reduce)
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException { int sum = 0;
for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
Example Word Count (Driver)
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>"); System.exit(2);
}
Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
Word Count Execution
the quick brown fox
the fox ate the mouse how now brown cow Map Map Map Reduce Reduce brown, 2 fox, 2 how, 1 now, 1 the, 3 ate, 1 cow, 1 mouse, 1 quick, 1 the, 1 brown, 1 fox, 1 quick, 1 the, 1 fox, 1 the, 1 how, 1 now, 1 brown, 1 ate, 1 mouse, 1 cow, 1
Input Map Shuffle & Sort Reduce Output
brown: 1,1 fox: 1,1 how:1 now:1 the:1,1,1 ate: 1 cow: 1 mouse: 1 quick: 1
An Optimization: The Combiner
def combiner(key, values):
output(key, sum(values))
• A combiner is a local aggregation function
for repeated keys produced by same map
• For associative ops. like sum, count, max
• Decreases size of intermediate data
• Example: local counting for Word Count:
Word Count with Combiner
Input Map & Combine Shuffle & Sort Reduce Output
the quick brown fox
the fox ate the mouse how now brown cow Map Map Map Reduce Reduce brown, 2 fox, 2 how, 1 now, 1 the, 3 ate, 1 cow, 1 mouse, 1 quick, 1 the, 1 brown, 1 fox, 1 quick, 1 the, 2 fox, 1 how, 1 now, 1 brown, 1 ate, 1 mouse, 1 cow, 1 split 0 split 1 split 2 split 3 split 4 worker worker worker worker worker Master User Program output file 0 output file 1 (1) submit
(2) schedule map (2) schedule reduce
(3) read
(4) local write
(5) remote read (6) write
Input files
Map phase
Intermediate files (on local disk)
Reduce phase
Output files
Adapted from (Dean and Ghemawat, OSDI 2004)
Distributed File System
•
Don’t move data to workers… move workers to the
data!
–
Store data on the local disks of nodes in the cluster
–
Start up the workers on the node that has the data local
•
Why?
–
Not enough RAM to hold all the data in memory
–
Disk access is slow, but disk throughput is reasonable (i.e.
sequential reading of disk for stream processing)
•
A distributed file system is the answer
–
GFS (Google File System) for Google’s MapReduce
–
HDFS (Hadoop Distributed File System) for Hadoop
Another example of MapReduce
•
Clickstream-like data: for each ad viewing,
user info and whether they clicked on the ad:
•
{userid, ip, zip, adnum, clicked}
•
Want unique users who saw, clicked, by zip
First Try
•
First try key as zip:
•
Map can emit {90210, {0,1}} if saw and failed
to click, {90210, {1,1}} if saw and clicked
•
Reduce receives, say:
•
{90210, [{1,1}, {0,1}, {0,1}]}
•
This shows three visits, one click, but we don’t
know if these visits were by different users, so
we don’t know the number of unique users
Second try
•
We need to preserve user identity longer
•
Use {zip, userid} as key
•
Value: again {0,1} or {1,1} if saw and clicked
•
Map emits {{90210,user123}, {0,1}}, etc.
•
Reducer gets info on one user, one zip:
•
{{90210,user123}, [{0,1}, {1,1}]}
•
Reducer can process list, emit {90210,user123},
{1,2}}
Second MapReduce pass
•
Reducer (pass 1) emits {90210,user123}, {1,2}}
•
Second Map reads this, emits its contribution to
zip’s stats (one user saw and clicked): {90210, {1,
1}}
•
Second Reduce receives 2210 user reports for this
zip:
{90210, {{1,1}, {0,1}, {0,1}, …}}
•
And counts up unique users and their clicks:
emits {90210, {56, 2210}} for 2210 unique users
in zip 90210 viewed ads, 56 of them clicked.
Compare to SQL
•
Table T of {userid, ip, zip, adnum, clicked}
•
Using a trick, we can do this in one select:
select zip, count (distinct userid), count (distinct clicked*userid) from T
group by zip
•
Assumes clicked = 0 or 1 in T row
•
Note that DB2, Oracle, and mysql can do
count (distinct expr),
though entry SQL92 only requires
count(distinct column)
Compare to SQL
•
Table T of {userid, ip, zip, adnum, clicked}
Closer to MapReduce processing
select zip, userid, count (clicked) cc from T group by zip, userid
•
Put results into table T1 (zip, userid, cc)
select zip, count(*), sum(sign(cc)) from T1group by zip