• No results found

Building Real-Time Analytics Into Big Data Applications

N/A
N/A
Protected

Academic year: 2021

Share "Building Real-Time Analytics Into Big Data Applications"

Copied!
42
0
0

Loading.... (view fulltext now)

Full text

(1)

©2015, Amazon Web Services, Inc. or its affiliates. All rights reserved

Building Real-Time Analytics Into

Big Data Applications

Shawn Gandhi, Solutions Architect

(2)

{ "payerId": "Joe", "productCode": "AmazonS3", "clientProductCode": "AmazonS3", "usageType": "Bandwidth", "operation": "PUT", "value": "22490", "timestamp": "1216674828" }

Metering Record

127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326

Common Log Entry

<165>1 2003-10-11T22:14:15.003Z

mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"]

Syslog Entry

“SeattlePublicWater/Kinesis/123/Realtime” – 412309129140

MQTT Record

<R,AMZN ,T,G,R1>
(3)

Generated data

Available for analysis

Data volume

Gartner: User Survey Analysis: Key Trends Shaping the Future of Data Center Infrastructure Through 2011 IDC: Worldwide Business Analytics Software 2012–2016 Forecast and 2011 Vendor Shares

(4)
(5)
(6)
(7)

Big data: best served fresh

Big data

• Hourly server logs:

were your systems misbehaving one hour ago

• Weekly / Monthly Bill:

what you spent this billing cycle

• Daily customer-preferences report from your website’s click stream:

what deal or ad to try next time

• Daily fraud reports:

was there fraud yesterday

Real-time big data

• Amazon CloudWatch metrics: what went wrong now

• Real-time spending alerts/caps: prevent overspendingnow

• Real-time analysis:

what to offer the current customer now

• Real-time detection:

(8)

Talk outline

• A tour of Amazon Kinesis concepts in the

context of a Twitter Trends service

• Implementing the Twitter Trends service

• Amazon Kinesis in the broader context of a big

data ecosystem

(9)

Sending and reading data from Amazon Kinesis

streams

HTTP Post AWS SDK LOG4J Flume Fluentd Get* APIs Kinesis Client Library + Connector Library Apache Storm Amazon Elastic MapReduce

Sending

Reading

(10)

A Tour of Amazon Kinesis Concepts in the

Context of a Twitter Trends Service

(11)

Demo

(12)

twitter-trends.com

Elastic Beanstalk

twitter-trends.com

twitter-trends.com website

(13)

twitter-trends.com

Too big to handle on one box

(14)

twitter-trends.com

The solution: streaming map/reduce

My top-10

My top-10

My top-10

(15)

twitter-trends.com

Core concepts

My top-10

My top-10

My top-10

Global top-10

Data record

Stream

Partition key

Shard

Worker

Shard:

1 4 1 7 1 8 2 1 2 3

Data record

Sequence number

(16)

twitter-trends.com

How this relates to Amazon Kinesis

(17)

Core concepts recapped

Data record

~ tweet

Stream

~ all tweets (the Twitter Firehose)

Partition key

~ Twitter topic (every tweet record belongs to

exactly one of these)

Shard

~ all the data records belonging to a set of Twitter

topics that will get grouped together

Sequence number

~ each data record gets one assigned

when first ingested

Worker

~ processes the records of a shard in sequence

number order

(18)

twitter-trends.com

Using the Amazon Kinesis API directly

K

I

N

E

S

I

S

iterator = getShardIterator(shardId, LATEST); while (true) { [records, iterator] = getNextRecords(iterator, maxRecsToReturn); process(records); } process(records): {

for (record in records) {

updateLocalTop10(record); } if (timeToDoOutput()) { writeLocalTop10ToDDB(); } } while (true) { localTop10Lists = scanDDBTable(); updateGlobalTop10List( localTop10Lists); sleep(10); }

(19)

K

I

N

E

S

I

S

twitter-trends.com

Challenges with using the Amazon Kinesis API

directly

Kinesis

application

Manual creation of workers and assignment to shards

How many workers per EC2 instance?

(20)

K

I

N

E

S

I

S

twitter-trends.com

Using the Amazon Kinesis Client Library

Kinesis

application

Shard mgmt table

(21)

K

I

N

E

S

I

S

twitter-trends.com

Elastic scale and load balancing

Shard mgmt table Auto scaling Group

Amazon

CloudWatch

(22)

K

I

N

E

S

I

S

twitter-trends.com

Shard management

Shard mgmt table Auto scaling Group

Amazon

CloudWatch

Amazon

SNS

Console

AWS

(23)

K

I

N

E

S

I

S

twitter-trends.com

The challenges of fault tolerance

X

X

X

Shard mgmt
(24)

K

I

N

E

S

I

S

twitter-trends.com

Fault tolerance support in KCL

Shard mgmt table

X

Availability Zone 1 Availability Zone 3
(25)

Checkpoint, replay design pattern

Amazon Kinesis

14 17 18 21 23

Shard-i

2 3 5 8 10

Shard

ID

Lock

Seq

num

Shard-i

Host A Host B Shard ID

Local top-10

Shard-i

0

10

18

X

2 3 5 8 10 14 17 18 21 23

0

3

10

Host A

Host B

{#Obama: 10235, #Seahawks: 9835, …} {#Obama: 10235, #Congress: 9910, …}

10

23

14 17 18 21 23
(26)

Catching up from delayed processing

Amazon Kinesis

14 17 18 21 23 Shard-i 2 3 5 8 10 Host A Host B

X

2 3 5 2 3 5 8 10 14 17 18 21 23

How long until

we’ve caught up?

Shard throughput SLA:

-

1 MBps in

-

2 MBps out

=>

Catch up time ~ down time

Unless your application

can’t process 2 MBps on a

single host

=>

(27)

Concurrent processing applications

Amazon Kinesis

14 17 18 21 23 Shard-i 2 3 5 8 10 twitter-trends.com spot-the-revolution.org 0 18 2 3 5 8 10 14 17 18 21 23 3 10 0 23 14 17 18 21 23 23 2 3 5 8 10 10
(28)

Why not combine applications?

Amazon Kinesis

twitter-trends.com

twitter-stats.com

Output state and checkpoint every 10 seconds

Output state and checkpoint every hour

(29)
(30)
(31)

How many shards?

Additional considerations:

- How much processing is needed per data record/data byte?

- How quickly do you need to catch up if one or more of your workers stalls or fails?

You can always dynamically reshard

to adjust to changing workloads

(32)

Twitter Trends shard processing code

Class TwitterTrendsShardProcessor implements IRecordProcessor {

public TwitterTrendsShardProcessor() { … }

@Override

public void initialize(String shardId) { … }

@Override

public void processRecords(List<Record> records,

IRecordProcessorCheckpointer checkpointer) { … }

@Override

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { … }

(33)

Twitter Trends shard processing code

Class TwitterTrendsShardProcessor implements IRecordProcessor {

private Map<String, AtomicInteger> hashCount = new HashMap<>();

private long tweetsProcessed = 0;

@Override

public void processRecords(List<Record> records,

IRecordProcessorCheckpointer checkpointer) { computeLocalTop10(records); if ((tweetsProcessed++) >= 2000) { emitToDynamoDB(checkpointer); } } }

(34)

Twitter Trends shard processing code

private void computeLocalTop10(List<Record> records) { for (Record r : records) {

String tweet = new String(r.getData().array()); String[] words = tweet.split(" \t");

for (String word : words) { if (word.startsWith("#")) {

if (!hashCount.containsKey(word)) hashCount.put(word, new AtomicInteger(0)); hashCount.get(word).incrementAndGet();

} } }

private void emitToDynamoDB(IRecordProcessorCheckpointer checkpointer) { persistMapToDynamoDB(hashCount);

try {

checkpointer.checkpoint();

} catch (IOException | KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) {

// Error handling }

hashCount = new HashMap<>(); tweetsProcessed = 0;

(35)

Amazon Kinesis as a gateway into the big

data ecosystem

(36)

Big data has a lifecycle

twitter-trends.com Twitter trends processing application Twitter trends statistics Twitter trends anonymized archive
(37)

Connector framework

public interface IKinesisConnectorPipeline<T> {

IEmitter<T> getEmitter(KinesisConnectorConfiguration configuration);

IBuffer<T> getBuffer(KinesisConnectorConfiguration configuration);

ITransformer<T> getTransformer(

KinesisConnectorConfiguration configuration);

IFilter<T> getFilter(KinesisConnectorConfiguration configuration); }

(38)

Transformer and Filter interfaces

public interface ITransformer<T> {

public T toClass(Record record) throws IOException;

public byte[] fromClass(T record) throws IOException; }

public interface IFilter<T> {

public boolean keepRecord(T record); }

(39)

Tweet transformer for S3 archival

public class TweetTransformer implements ITransformer<TwitterRecord> {

private final JsonFactory fact =

JsonActivityFeedProcessor.getObjectMapper().getJsonFactory();

@Override

public TwitterRecord toClass(Record record) { try {

return fact.createJsonParser(

record.getData().array()).readValueAs(TwitterRecord.class); } catch (IOException e) {

throw new RuntimeException(e); }

(40)

Example Amazon Redshift connector

K

I

N

E

S

I

S

Amazon Redshift Amazon S3
(41)

Example Amazon Redshift connector v2

K

I

N

E

S

I

S

Amazon Redshift Amazon S3 K I N E S I S
(42)

Obrigado!

References

Related documents

The scope of services covered in this report includes AWS CloudHSM, AWS Direct Connect, Amazon DynamoDB, Amazon Elastic Block Store (EBS), Amazon Elastic Cloud Compute (EC2),

HIVE + AMAZON EMR + S3 = ELASTIC BIG DATA SQL ANALYTICS PROCESSING IN THE CLOUD.. A REAL WORLD

For an example event, see Step 2: Create a Lambda Function and Invoke it Manually Using Sample Event Data (Amazon Kinesis | Node.js) (p.

Amazon Kinesis also offloads a lot of developer burden in building a real-time, streaming data ingestion platform, and enables Supercell to focus on delivering games that

-  Complex real-time distributed logic -  Operational intelligence. Ÿ 

Collect real-time click-stream data: Amazon Kinesis Mobile

• Typically one-off grants of AWS resources like EC2 (compute) or S3 and EBS (storage) or more exotic like Amazon Kinesis and Twitter feeds. • Frequently result

Managed Service for Real Time Big Data Processing Create Streams to Produce &amp; Consume Data. Elastically Add and Remove Shards for Performance Use Kinesis Worker Library to