• No results found

Streaming items through a cluster with Spark Streaming

N/A
N/A
Protected

Academic year: 2021

Share "Streaming items through a cluster with Spark Streaming"

Copied!
41
0
0

Loading.... (view fulltext now)

Full text

(1)

Streaming items through a

cluster with Spark Streaming

Tathagata “TD” Das

@tathadas

CME 323: Distributed Algorithms and Optimization Stanford, May 6, 2015

(2)

Who am I?

> Project Management Committee (PMC) member of Apache Spark

> Lead developer of Spark Streaming

> Formerly in AMPLab, UC Berkeley

> Software developer at Databricks

> Databricks was started by creators of Spark to provide Spark-as-a-service in the cloud

(3)

Big Data

(4)

Big Streaming Data

(5)

Why process Big Streaming Data?

Fraud detection in bank transactions

Anomalies in sensor data

Cat videos in tweets

(6)

How to Process Big Streaming Data

> Ingest – Receive and buffer the streaming data

> Process – Clean, extract, transform the data

> Store – Store transformed data for consumption

Ingest

data Process

data Store

results

Raw Tweets

(7)

How to Process Big Streaming Data

> For big streams, every step requires a cluster

> Every step requires a system that is designed for it

Ingest

data Process

data Store

results

Raw Tweets

(8)

Stream Ingestion Systems

> Kafka – popular distributed pub-sub system

> Kinesis – Amazon managed distributed pub-sub

> Flume – like a distributed data pipe

Ingest

data Process

data Store

results

Raw Tweets

Amazon Kinesis

(9)

Stream Ingestion Systems

> Spark Streaming – most demanded

> Storm – most widely deployed (as of now ;) )

> Samza – gaining popularity in certain scenarios

Ingest

data Process

data Store

results

Raw Tweets

(10)

Stream Ingestion Systems

> File systems – HDFS, Amazon S3, etc.

> Key-value stores – HBase, Cassandra, etc.

> Databases – MongoDB, MemSQL, etc.

Ingest

data Process

data Store

results

Raw Tweets

(11)
(12)

Kafka Cluster

Producers and Consumers

> Producers publish data tagged by “topic”

> Consumers subscribe to data of a particular “topic”

Producer 1

Producer 2

(topicX, data1)

(topicY, data2)

(topicX, data3)

Topic X Consumer

Topic Y Consumer

(topicX, data1)

(topicX, data3)

(topicY, data2)

(13)

> Topic = category of message, divided into partitions

> Partition = ordered, numbered stream of messages

> Producer decides which (topic, partition) to put each

message in

Topics and Partitions

(14)

> Topic = category of message, divided into partitions

> Partition = ordered, numbered stream of messages

> Producer decides which (topic, partition) to put each

message in

> Consumer decides which (topic, partition) to pull messages from

-  High-level consumer – handles fault-recovery with Zookeeper

-  Simple consumer – low-level API for greater control

Topics and Partitions

(15)

How to process Kafka messages?

Ingest

data Process

data Store

results

Raw Tweets

> Incoming tweets received in distributed manner

and buffered in Kafka

> How to process them?

(16)

treaming

(17)

What is Spark Streaming?

Scalable, fault-tolerant stream processing system

File systems Databases Dashboards Flume

HDFS Kinesis

Kafka

Twitter

High-level API

joins, windows, … often 5x less code

Fault-tolerant

Exactly-once semantics, even for stateful ops

Integration

Integrate with MLlib, SQL, DataFrames, GraphX

(18)

How does Spark Streaming work?

>  Receivers chop up data streams into batches of few seconds

>  Spark processing engine processes each batch

and pushes out the results to external data stores

data streams

Receivers

batches as RDDs

results as RDDs

(19)

Spark Programming Model

> Resilient distributed datasets (RDDs)

-  Distributed, partitioned collection of objects -  Manipulated through parallel transformations

(map, filter, reduceByKey, …)

-  All transformations are lazy, execution forced by actions (count, reduce, take, …)

-  Can be cached in memory across cluster -  Automatically rebuilt on failure

(20)

Spark Streaming Programming Model

> Discretized Stream (DStream)

-  Represents a stream of data

-  Implemented as a infinite sequence of RDDs

> DStreams API very similar to RDD API

-  Functional APIs in

-  Create input DStreams from Kafka, Flume, Kinesis, HDFS, … -  Apply transformations

(21)

Example – Get hashtags from Twitter

val  ssc  =  new  StreamingContext(conf,  Seconds(1))  

StreamingContext  is  the  star)ng  

point  of  all  streaming  func)onality   Batch  interval,  by  which   streams  will  be  chopped  up  

(22)

Example – Get hashtags from Twitter

val  ssc  =  new  StreamingContext(conf,  Seconds(1))   val  tweets  =  TwitterUtils.createStream(ssc,  auth)  

Input  DStream  

batch  @  t+1  

batch  @  t   batch  @  t+2  

tweets  DStream  

replicated  and  stored  in   memory  as  RDDs   TwiCer  Streaming  API  

(23)

Example – Get hashtags from Twitter

val  tweets  =  TwitterUtils.createStream(ssc,  None)  

val  hashTags  =  tweets.flatMap(status  =>  getTags(status))  

flatMap   flatMap   flatMap  

transforma0on:  modify  data  in  one   DStream  to  create  another  DStream     transformed  

DStream  

new  RDDs  created   for  every  batch    

batch  @  t+1  

batch  @  t   batch  @  t+2  

tweets  DStream  

hashTags  Dstream  

[#cat,  #dog,  …  ]  

(24)

Example – Get hashtags from Twitter

val  tweets  =  TwitterUtils.createStream(ssc,  None)  

val  hashTags  =  tweets.flatMap(status  =>  getTags(status))   hashTags.saveAsTextFiles("hdfs://...")  

output  opera0on:  to  push  data  to  external  storage  

flatMap flatMap flatMap

save save save

batch  @  t+1  

batch  @  t   batch  @  t+2  

tweets  DStream  

hashTags  DStream  

every  batch   saved  to  HDFS  

(25)

Example – Get hashtags from Twitter

val  tweets  =  TwitterUtils.createStream(ssc,  None)  

val  hashTags  =  tweets.flatMap(status  =>  getTags(status))   hashTags.foreachRDD(hashTagRDD  =>  {  ...  })  

foreachRDD:  do  whatever  you  want  with  the  processed  data  

flatMap flatMap flatMap

foreach foreach foreach

batch  @  t+1  

batch  @  t   batch  @  t+2  

tweets  DStream  

hashTags  DStream  

Write  to  a  database,  update  analy)cs   UI,  do  whatever  you  want  

(26)

Example – Get hashtags from Twitter

val  tweets  =  TwitterUtils.createStream(ssc,  None)  

val  hashTags  =  tweets.flatMap(status  =>  getTags(status))   hashTags.foreachRDD(hashTagRDD  =>  {  ...  })  

         

ssc.start()    

all  of  this  was  just  setup  for  what  to  do  when   streaming  data  is  receiver  

this  actually  starts  the  receiving  and  processing  

(27)

What’s going on inside?

> Receiver buffers tweets in Executors’ memory

> Spark Streaming Driver launches tasks to

process tweets

Driver running DStreams Executors

launch tasks to process tweets

Raw Tweets

Twitter Receiver Buffered

Tweets

Buffered Tweets

Spark Cluster

(28)

What’s going on inside?

Driver running DStreams Executors

launch tasks to process data

Receiver

Kafka Cluster

Receiver Receiver

receive data in parallel

Spark Cluster

(29)

Performance

Can process 60M records/sec (6 GB/sec) on 100 nodes at sub-second latency

0   0.5   1   1.5   2   2.5   3   3.5  

0   50   100  

Cluster  Throughput  (GB/s)  

#  Nodes  in  Cluster  

WordCount  

1  sec   2  sec   0  

1   2   3   4   5   6   7  

0   50   100  

Cluster  Thhroughput  (GB/s)  

#  Nodes  in  Cluster  

Grep  

1  sec   2  sec  

(30)

DStream  of  data  

Window-based Transformations

val  tweets  =  TwitterUtils.createStream(ssc,  auth)  

val  hashTags  =  tweets.flatMap(status  =>  getTags(status))  

val  tagCounts  =  hashTags.window(Minutes(1),  Seconds(5)).countByValue()  

sliding  window  

opera)on   window  length   sliding  interval  

window  length  

sliding  interval  

(31)

Arbitrary Stateful Computations

Specify function to generate new state based on previous state and new data  

-  Example: Maintain per-user mood as state, and update it with their tweets

   def  updateMood(newTweets,  lastMood)  =>  newMood  

 

   val  moods  =  tweetsByUser.updateStateByKey(updateMood  _)  

(32)

Integrates with Spark Ecosystem

Spark Core Spark

Streaming

Spark SQL MLlib GraphX

(33)

Combine batch and streaming processing

>  Join data streams with static data sets

//  Create  data  set  from  Hadoop  file  

val  dataset  =  sparkContext.hadoopFile(“file”)            

//  Join  each  batch  in  stream  with  dataset  

kafkaStream.transform  {  batchRDD  =>    

           batchRDD.join(dataset)filter(...)   }  

Spark Core Spark

Streaming

Spark SQL MLlib GraphX

(34)

Combine machine learning with streaming

>  Learn models offline, apply them online

//  Learn  model  offline  

val  model  =  KMeans.train(dataset,  ...)    

//  Apply  model  online  on  stream   kafkaStream.map  {  event  =>    

       model.predict(event.feature)     }  

 

Spark Core Spark

Streaming

Spark SQL MLlib GraphX

(35)

Combine SQL with streaming

>  Interactively query streaming data with SQL

//  Register  each  batch  in  stream  as  table  

kafkaStream.map  {  batchRDD  =>    

   batchRDD.registerTempTable("latestEvents")   }  

 

//  Interactively  query  table  

sqlContext.sql("select  *  from  latestEvents")  

Spark Core Spark

Streaming

Spark SQL MLlib GraphX

(36)

100+ known industry deployments

(37)

Why are they adopting Spark Streaming?

Easy, high-level API

Unified API across batch and streaming

Integration with Spark SQL and MLlib

Ease of operations

37

(38)

Neuroscience @ Freeman Lab, Janelia Farm

Spark Streaming and MLlib to analyze neural activities

Laser microscope scans Zebrafish brainà Spark Streaming à interactive visualization à

laser ZAP to kill neurons!

http://www.jeremyfreeman.net/share/talks/spark-summit-2014/

(39)

Neuroscience @ Freeman Lab, Janelia Farm

Streaming machine learning algorithms on time series

data of every neuron

Upto 2TB/hour and

increasing with brain size

Upto 80 HPC nodes

http://www.jeremyfreeman.net/share/talks/spark-summit-2014/

(40)

Streaming Machine Learning Algos

> Streaming Linear Regression

> Streaming Logistic Regression

> Streaming KMeans

http://www.jeremyfreeman.net/share/talks/spark-summit-east-2015/#/algorithms-repeat

(41)

Okay okay, how do I start off?

> Online Streaming

Programming Guide

http://spark.apache.org/docs/latest/streaming-programming-guide.html

> Streaming examples

https://github.com/apache/spark/tree/master/examples/src/main/scala/

org/apache/spark/examples/streaming

References

Related documents

NAI Global is the premier network of independent commercial real estate firms and one of the largest commercial real estate service providers worldwide.

Study QIM Hard/Soft Dimensions Innovation Measures Principal empirical results: QIM-Innovation relationship 1 Benner and Tushman (2002) ISO9000 Hard QIM dimensions only

Nmap sends a raw IP packet without any additional protocol header (see a good TCP/IP book for information about IP packets), to each protocol on the target machine. Receipt of an

This reaction is known to take place during the processing of milk and milk products.1 The deproteinized fraction of one of these milk products was examined by paper chromatography,

Polarization studies revealed that these compounds behave as mixed type inhibitors and inhibit corrosion by parallel adsorption on the surface of stainless steel due to

Compared with CPU-OpenMP (parallel computing with multiple.. CPU cores) based analysis, GPU based FLIM analysis has significantly sped up lifetime calculations. b) A

The results show that microbe-stimulated dendritic cells and macrophages differentially produce cytokines in response to various bacterial strains which may modulate the formation