Dr. Dennis Pfisterer
Institut für Telematik, Universität zu Lübeck http://www.itm.uni-luebeck.de/people/pfisterer
Future Internet Technologies
Big (?) Data Processing
FIT Until Now…
Networking Architectures IPv4 IPv6 Frame Relay Frame Relay ATM MPLS GSM Mobile IP 6LoWPAN LISP HIP SCTP DTN REST HTTP CoAP SPDY Client-Server D e si g n P ri n ci p le s P2P Big Data Processing
• Client/Server
– Remote Procedure Call (RPC)
– REST (HTTP, CoAP, SPDY)
– Replication and Load Balancing
• Distributed Applications
– N-Tier Architectures
– Service-Oriented Architecture (SOA)
• Peer-to-Peer
– Distributed Hash Tables (DHT), ...
Traditional Application-Level Architectures
#3
• No server-initiated data transfer
– Often due to statelessnes of server
• Results in unnecessary polling to transfer data from
server to client
– Bandwidth, processing, and memory overhead
Client/Server Interaction Model
4
Server
Client
Request
3- and 4-Tier-Architectures: 3-Tier
Tier 1: Presentation Tier 2: Business Logic Tier 3: Data Server Client Client Client DB-Server DB-Server3- and 4-Tier-Architectures: 4-Tier
Tier 1: Presentation Tier 2: Web Server Tier 4: Data Web Server Client Client Client DB-Server DB-Server Tier 3: Application Server App Server 1 App Server 2
• Application and Data are replicated on many servers
– Load balancer distributes load across, e.g., LAMP hosts (Linux, Apache, MySQL, PHP)
• Bottleneck: Database
– May be clustered (i.e., replicated and/or partitioned) to increase performance – Problem: Consistency requirements (ACID) of SQL databases limit scalability
Replication and Load Balancing
#7 Server #1 App. Instance Server #2 App. Instance Server #n App. Instance ... Client Client Client Load Balancer State
• Distribute application (including state) amongst different servers – In contrast to multiple instances on multiple servers
– Typically, the application state is partitioned (e.g., Google BigTable) • Better scalability for a certain class of applications
– More scalable if less strict consistency requirements are imposed (NoSQL) – E.g., Facebook, Twitter, ...
Distributed Applications
#8 Server #1 App. Instance Server #n App. Instance ... Server #1 App. Instance Server #n ... State• For Internet-scale applications, state is distributed
globally
• Data are typically moved to where usage is expected
– E.g., German videos
Germany
Data Locality
#9
• Requirement on today‘s applications is to
perform efficiently on an Internet scale
• Scalability w/o changing the application
Exemplary Application
#11 Application Users Friendship Graph User Behavior Web GUI Data Mining Statistics Log Analysis Ad Delivery Client Client Client• It‘s not about servers anymore
– Data and data processing services are important
– Issue: Efficient distribution, processing and storage of data
• Move away from fixed (n-tier) layering models
– Dynamic composition of processing services (at
runtime continuous deployment)
– Communication using messages
• Required: concepts for „loosely-coupled“ distributed applications
Internet-Scale Applications
#12 Data Data DataMessage Queuing & Publish/Subscribe
#13
• Concept of „message
queues“
• Data processing services
communicate
asynchronously using
message queues
• Decouples data producers
and consumers
Message Queuing
Data Data Data Data - - M2 M1 Message QueueHost #q Host #n Host #1
• Mode of operation
– Producers dispatch messages into a queue – Consumers take messages from a queue
• Multiple queues for different purposes can be created
– E.g., one for dispatching computational tasks, one for logging, ... • Producers, consumers, and queues may run on different systems
Message Queuing
#15
Producer - - M2 M1 Consumer
Message Queue
• Distributed realization of the producer/consumer problem – I.e., thread synchronization
• Producer and consumer are only aware of the queue – Association can be changed (dynamically) w/o changing the
application
• Producer and consumer don‘t need to be available at the same time – Queue provides time-decoupling
– Allows asynchronous operation
Message Queuing: Properties
#16
Producer - - M2 M1 Consumer
• Queues can have multiple consumers
• E.g., to distribute messages (e.g., tasks) to a set of consumers
– Sometimes called „work queues“
– Different message dispatching strategies possible (e.g., round robin, fairness, load-based, ...)
Message Queuing: Properties
#17 Producer - - T4 T3 Message Queue Worker #1 Worker #n ... T1 T2 • Best-effort
– Messages are lost if no consumer is listening
• Guaranteed (acknowledged) delivery
– On application layer (cf. DTN custody transfer)
• Persistent
– Messages are persisted until consumed
• Transient (not persistent)
– Messages are lost if the queue crashes
Delivery Semantics
Producer - - M2 M1 Consumer
• RPC mapped to request and response messages
– Correlation of request and response based on IDs
• Inherits delivery semantics from message queuing
Message-Based RPC
#19 Client Server ResponseQueue RequestQueue Request id = 123 Response id = 123• Builds on top of individual message queues
– Publisher publish messages to an exchange – Consumers subscribe to an exchange
– “Exchanges” route messages to queues based on subscriptions – A “broker” manages a set of exchanges and queues
• Distribution of published messages to subscribers
– Broadcast (see below), topic-based, or content-based (next slides)
Publish/Subscribe Pattern (Pub/Sub)
#20 Producer Consumer #1 Consumer #n ... - - M2 M1 - - M2 M1 Exchange Broker
• Consumers subscribe to an exchange with a certain “topic” • Messages are annotated with a „topic“
– Topics are arbitrary strings
• Exchange matches subscriptions and message topics and routes them to the according queues
Topic-Based Publish/Subscribe
#21 Producer Consumer #1 Consumer #n ... - - M2 M1 - - - M2 M1: “news.germany” M2: “news.greece“• Consumers subscribe to an exchange with content-based filters • Exchange routes messages based on a message‘s contents
– Typically structured data (e.g., JSON, XML)
• Most production systems use topic-based pub/sub
Content-Based Publish/Subscribe
Producer Consumer #1 Consumer #n ... - - - -- - - M1 M1: { company: apple, price: 123$ }company = „apple“ && 110 < price < 120
company = „apple“ && price > 120
• Centralized brokers are a single point of failure and bottleneck for scalable applications
• Brokers can be clustered
– To increase availability and throughput
– Requires inter-broker message routing and a communication protocol – Creates distributed (replicated, partitioned) subscription state – and all
associated issues
Internet-Scale Applications and Pub/Sub
#23 - - M2 M1 - - M2 M1 Exchange Broker Cluster of Brokers Cluster of Brokers
• Producers and consumers may attach to any broker of the cluster • How are messages routed?
– Important goal: efficiency
(throughput, bandwidth, latency, consistency, processing cost)
Broker Clustering
Security - 04 Cryptology #24 Broker Broker Broker Broker Producer Producer Consumer Consumer Consumer?
• Brokers compute a routing tree
– Based on upstream consumer subscriptions
• Subscription filters are aggregated along the path
Topic Based Pub/Sub and Clustering
#25
Broker
Broker Broker
Consumer #1 Consumer #2 Consumer #n Producer news.eu.germany news.eu.germany Upstream filter aggregation news.eu.germany news.eu.greece news.eu.germany news.eu.* • Pub/Sub Implementations – RabbitMQ, http://www.rabbitmq.com – Apache ActiveMQ, http://activemq.apache.org
– Amazon Simple Queue Service (Amazon SQS), http://aws.amazon.com/sqs – Apache Qpid, http://qpid.apache.org
– IBM WebSphere MQ, http://www-01.ibm.com/software/integration/wmq – Apache Kafka, http://incubator.apache.org/kafka
– Kestrel, http://robey.github.com/kestrel
• Protocols
– Advanced Message Queuing Protocol (AMQP), http://www.amqp.org – ZeroMQ Message Transport Protocol (ZMTP), http://www.zeromq.org – Streaming Text Oriented Messaging Protocol (STOMP), http://stomp.github.com – XMPP publish-subscribe extension, http://xmpp.org/extensions/xep-0060.html
• Others
– Java Message Service (JMS), http://java.sun.com/developer/technicalArticles/Ecommerce/jms
• Advantages
– Scalability / Clustering / Federation – Loose coupling
• Publisher and subscriber unaware of each other
• Flexibility and extendibility (add new components at runtime, …) • Disconnected operation
• Disadvantages
– Loose coupling: hard to give guarantees
• What happens if a message is stuck in a queue forever • E.g., because there is no consumer
– Brokers are a centralized bottleneck
• What about efficient and Internet-scale distributed brokers?
Pub/Sub: Conclusion
#27
Map / Reduce
• Goal: distributed processing of big data sets – Typically too large to be processed on a single
machine
– Moving all data would be too costly (time/bandwidth)
• Map / Reduce: Framework for processing
“embarrassingly parallel problems” on a large number of computers
• Initially developed by Google (2004)
– Well-known open-source implementation:
Apache Hadoop, http://hadoop.apache.org
Map / Reduce
#29 Data Data Data Data Data Data• Programming paradigm to
– Split up a program into several tasks
– Process these tasks on different machines in parallel – Requires that the algorithm is parallelizable
• Realization in Google‘s Map/Reduce
– Program is split on to n machines – 1 master
• Splits up the full task into subtasks • Assigns subtasks to workers • Receives results from workers – n-1 workers
• Receive jobs and data from master • Return results to master
• Area of squares: A
s= (2r)
2= 4r
2• Area of circles: A
c= πr
2Approximation steps
1.
Randomly generate k
spoints in square
2.
Count number of points located in the circle k
c3.
Approximate
Example: Approximating
π
(1)
#31 r s c c s Α Α 4 π π Α 4 Α = ⇔ = c s k k 4 π≈• Accuracy increases with larger k
s• Step 2 (check whether point is in circle) is parallelizable
• Execution
– Master generates ksrandom points in square
– Each of the n workers wi(1 ≤ i ≤ n) is assigned a list of points
– Each worker counts the number of points in the received list which are
located in the circle (ki,cfor worker wi) and gives the result to the master
– Master computes
Example: Approximating
π
(2)
#32∑
= ≈ n 1 i c i, s k k 4 π• Unsolved problem if all algorithms solving problems
for classes P or NP are parallelizable
• There are many such sequential algorithms without a
known parallel equivalent
• Many scientists assume that there is no general
parallelizability property
Parallelizing tasks (1)
#33
• Obviously parallelizable tasks
– Brute force attack on encrypted documents
– Count word occurrence in several documents
– …
• Not (obliviously) parallelizable tasks
– Compute the Fibonacci function: f
k+2= f
k+1+ f
k– In general: tasks where the next step depends on
the previous step(s) results
– …
• Two or three steps
– Map
– Combine (optional)
– Reduce
• Mode of operation
– Input data is either distributed (e.g., server log files)or split into multiple parts (e.g., single file)
– Map preprocesses input data to an intermediate format
– Framework dispatches intermediate results to reducers (see next slide) – Reduce combines intermediate
results to final results
Map / Reduce
Security - 04 Cryptology #35
Figure source: http://wikis.gm.fh-koeln.de/wiki_db/uploads/Datenbanken/MapReduce/mapreduce.png
Dispatch Security - 04 Cryptology #36 Split1 Split2 Split3 Map Map Map ... 1 Partition1 2 Partition2 n Partitionn ... 1 Partition1 2 Partition2 n Partitionn ... 1 Partition1 2 Partition2 n Partitionn Red1 Red2 Redn Input data Red File, Callback, ... Intermediate results (partitioned) „The lecturer is the best“ „The lecturer is the
best. And this is written by research assistants. Yada, yada, yada, …“ [ (the, 1), (lecturer, 1), (is, 1), (the, 1), (best, 1) ] [ (the, [1, 1]) ] [ (the, [2]) ] [ (the, 2), (lecturer, 1), (is, 1) (best, 1) ]
Security - 04 Cryptology #37 Split1 Split2 Split3 Map Map Map ... 1 Partition1 2 Partition2 n Partitionn ... 1 Partition1 2 Partition2 n Partitionn ... 1 Partition1 2 Partition2 n Partitionn Red1 Red2 Redn Input data Red File, Callback, ... -Map: K x V (L x W)*
- K and L: sets containing keys - V and W: sets containing values - All elements in a set are of the
same data type (e.g., String) - Maps a key/value pair to a list of
key/value pairs
-Intermediate Results
- Partitioned into n partitions - E.g., partition = hash(key) mod n -Transforms (L x W)* L x W*
- E.g., (the, 1), (the, 1) (the, [1,1]) - Handled by the framework
-Reduce: L x W* W*
- E.g., (the, [1,1]) 2
- Framework saves result as (the, 2)
•
Input document: „The lecturer is the best“
Example Map and Reduce Functions
/* key: document name, value: document content */
map(String key, String value)
{ for(String w : value.split(“ “)) { EmitIntermediate(w, 1); } } /* key: a word
values: list of intermediate results */
reduce(String key, Iterator<int> values)
{
int result = 0; for (int value : values) {
result += value; }
Emit(result); }
Map [(the, 1), (lecturer, 1), (is, 1), (the, 1), (best, 1)] Intermediate [(the,[1,1]), (lecturer, [1]), (is, [1]), (best, [1])] Reduce [(the, [2]), (lecturer, [1]), (is, [1]), (best, [1])]
Distributed Stream Processing
#39
• Pub/Sub
– Moving messages between data processing services efficiently
• Map/Reduce
– Move data processing services to the data – Limitation: Batch processing, only for sufficiently
parallelizable jobs
• Distributed Real-Time Stream Processing
– Mixture of Pub/Sub and Map/Reduce – Guaranteed data processing – No intermediate message brokers
– Goal: create graphs of data sources and data processors
Distributed Stream Processing
Security - 04 Cryptology #40 Data Data Data Continuous Data Streams
• Continuous real-time processing of data streams
– Batch processing in map/reduce
• Eventually emits a final result
– Continuous calculation of results
• Runs forever or until stopped • Produces new results over time
• Scalability / Parallelism
– Applications are comprised of parallelizable data processing services
– No centralized component (brokers), components directly exchange data
– Applications can be distributed globally Internet-scale
Distributed Stream Processing
Security - 04 Cryptology #41 Continuous Data Streams Data Data Data
• Implementation of a Distributed
Stream Processing framework
– Open-source project from Twitter – Used by many companies (e.g.,
Twitter, Groupon, etc.) to run their business logic
• Set of data sources and data
processing services
– Connections defined by a graph in which streams are edges
Example: Storm Framework [storm]
Data Source Data Processing Service Data Processing Service Data Processing Service Data Processing Service Data Source
• Stream: Unbounded sequence of tuples
• Tuple: Named list of values
Storm Streams and Tuples
Security - 04 Cryptology #43 Tuple Tuple Tuple Tuple Tuple Tuple
• A spout is a source of a stream
• E.g., a spout can read data and emit it as tuples from a
– File
– Message queue
– Web service
– RSS feed
Storm Spouts
Security - 04 Cryptology #44 Tuple Tuple Tuple Tuple Tuple TupleTuple Tuple Tuple Tuple Tuple Tuple
• Bolts process input streams and produce new streams
• Bolts can implement arbitrary logic, e.g.,
– Transformation functions – Filters – Aggregation functions – Joins – Database access
Storm Bolt
Security - 04 Cryptology #45 Tuple Tuple TupleTuple Tuple Tuple
Tuple Tuple Tuple
Tuple Tuple Tuple
Figure source: http://storm-project.net/images/topology.png
• A “topology” is a network of
spouts and bolts
• Spouts and bolts are
implemented in a parallelizable
manner
– I.e., multiple instances can be spawned
• The framework executes them in
parallel on a Storm cluster as
“tasks”
– Tasks communicate using message queues
Storm Topology
Task Task Task Task Task Task Task Task Task• Task
– Count words in a continuous stream of data and emit new word count for each word as soon as it changes
• Topology
– A spout reads sentences from some data source and emits them – One bolt splits sentences into individual words
– Another bolt keeps track of the number of occurrences of individual words and emits the word and count on change
Storm Example: Word Count
Security - 04 Cryptology #47
Figure source: https://github.com/nathanmarz/storm/wiki/Tutorial
split count
sentences
Source (e.g. message queue)
- M3 M2 M1 (sentence,
“The lecturer is the ...“)
(word, “the”), (word, “lecturer”), (word, “is”), (word, “the”) (the, 1) (lecturer, 1) (is, 1) (the, 2) … sentences
Source (e.g. message queue)
Task
• Framework runs multiple instances of spout and bolt implementations in
parallel
• To guarantee non-ambiguous results “groupings” can be defined
• Groupings
– “Shuffle” randomly picks a task
– “Field” makes sure one individual word always goes to the same task
Storm Example: Word Count
Security - 04 Cryptology #48
Figure source: https://github.com/nathanmarz/storm/wiki/Tutorial
split count
- M3 M2 M1
(word, “lecturer”), (word, “the”) Task
Task
Task
Task (word, “car”), (word, ”drives”)
(the, 1) (lecturer, 1) (the, 2) … (is, 1) (car, 1) (drives, 1) …
• Stream groupings define to which
task a tuple is sent when it is
emitted
• Examples
– Shuffle & Field
(previous slide)
– All: replicates to all tasks
– Global: task with lowest ID
– None: don’t care
– Direct: emitter defines receiver
Storm Stream Groupings
Security - 04 Cryptology #49
Task Task Task
Task
• Humans create exponentially growing amounts of data – “Big data challenge”
• Expressing data processing algorithms using functional primitives allows efficient scaling and distribution of computations on large (distributed) data sets
• Precondition: algorithm must be sufficiently parallelizable • New paradigm to design applications
– Not only for big but also for small data – Scale from start-up to global player
• [mapreduce] MapReduce: “Simplified Data Processing on
Large Clusters”, Jeffrey Dean and Sanjay Ghemawat, 2004.
http://research.google.com/archive/mapreduce.html
• [storm] Storm is a free and open source distributed realtime
computation system. Storm makes it easy to reliably process
unbounded streams of data, doing for realtime processing
what Hadoop did for batch processing.
http://storm-project.net
• [akka] Akka is a toolkit and runtime for building highly
concurrent, distributed, and fault tolerant event-driven
applications on the JVM.
http://akka.io
Literature