• No results found

Data Stream Management

N/A
N/A
Protected

Academic year: 2021

Share "Data Stream Management"

Copied!
30
0
0

Loading.... (view fulltext now)

Full text

(1)

Data Stream Management

Yanlei Diao

(2)

11/6/13 2 Yanlei Diao, University of Massachusetts Amherst

Data Stream Management

Two driving forces:

!  

A collection of applications where data streams naturally

exist but DBMS doesn

t help much

!  

Advances of sensor technologies

(3)

Financial Applications

v 

Financial services

Data feeds: stock tickers, foreign exchange transactions

Data rate: 10

s or 100

s thousands of messages per second

Applications:

−  routing trade requests,

−  automating trade strategies, −  market trend analysis…

Stream systems: e.g.,

−  http://www.streambase.com

(4)

11/6/13 4 Yanlei Diao, University of Massachusetts Amherst

Network and System Monitoring

v 

Network monitoring

• 

Packet traces, network performance measurements

• 

Data rate: gigabits per second

• 

Applications:

−  traffic analysis, performance monitoring, router configuration,

intrusion detection…

• 

Stream systems: e.g.,

−  Gigascope at AT&T

v 

System/Application monitoring

Data: system log, measurements

Stream systems: e.g.,

(5)

Wireless Sensor Networks

v 

Wireless sensor networks

• 

Sensor devices: temperature, light, pressure,

acceleration, humidity, magnetic field,

• 

A set of sensor devices auto-configure

themselves into a communication network

• 

Applications:

−  environment monitoring

−  habitat monitoring

−  structural monitoring

(6)

11/6/13 6 Yanlei Diao, University of Massachusetts Amherst

Radio Frequency Identification

v 

RFID Technology

Tags: small devices, transmitting a tag id when

brought close to an RFID reader

Readers: interrogate tags with radio signal; read

constantly, over a range, without line-of-sight

Applications:

−  supply chain management −  healthcare

−  pharmaceuticals −  postal services…

Stream systems: e.g.,

−  http://www.truviso.com 01.01298.6EF.0A 01.01267.60D.01 04.0768E.001.F0 reader_id, tag_id, timestamp

(7)

Execution Models: A Comparison

Queries, Rules, Event Specs, Subscriptions

Results Results

• Data in motion, unending

• Continuous, long-running queries

•  Data-driven execution

• Results returned continuously in

real-time

Data

Traditional DBMS

Attr1 Attr2 Attr3

Query

Data Stream System

• Data at rest

• One-time or periodic queries

•  Query-driven execution

(8)

11/6/13 8

Latency

V al ue o f D at a to D eci si on -Ma ki ng Time Pre ve nt ive / Pre di ct ive Actionable Reactive Historical Real- Time

Seconds Minutes Hours Days

Traditional “Batch” Business Intelligence Information Half-Life In Decision-Making 8 Months Time-critical Decisions

(9)

History of Stream Processing

v 

Early: Active DBs, ECA rules, Triggers, Data Broadcast

v 

Field took off around 2002

~10 SIGMOD/VLDB/ICDE

stream

papers thru 2001

275+ since then (holding steady at 40-50/yr)

v 

Lots of Research Systems-Building Projects

•  AT&T – Gigascope

•  Berkeley – TelegraphCQ->HIFI

•  Brandeis, Brown, MIT - Aurora->Borealis

•  Purdue – Nile

•  Stanford – STREAM

•  Wisconsin, Portland State – NiagaraCQ

(10)

11/6/13 10 Yanlei Diao, University of Massachusetts Amherst

Main Issues of Stream Management

v 

Query languages

v 

Single query processing

v 

Multi-query optimization

v 

Adaptive query processing

v 

Real-time processing

v 

Approximate answers

v 

Out of order processing

ü 

Sliding windows

ü 

Non blocking, windowed

operations

ü 

Sharing

ü 

Adaptivity

ü 

Quality of service

ü 

Confidence, sketching,

sampling, wavelets...

ü 

Punctuation, order-agnostic

(11)

Data Stream Model

v 

A New Data Model:

an infinite sequence of elements

An element has a

system time

and an

application time

Elements arrive in order of system time (

arrival order

)

If an application wants data in application time, need

buffering & sorting

If data comes out of order or very late, more work is needed

(12)

11/6/13 12 Yanlei Diao, University of Massachusetts Amherst

Stream Query Languages

v 

Extensions of SQL:

SEQUIN [SLR’96], CQL [ABW03], TCQ [CCD

+03], GSQL [CJS+03]

v 

Continuous Query Language (CQL)

FROM

can address both streams and relations

PARTITION BY

creates sub-streams

Window

: extracts a

set

of tuples from an

infinite stream

and

applies relational processing

−  ROW, physical window in number of tuples;

−  RANGE, logical window in terms of seconds, minutes, or hours

(more generally, positions in an ordered domain)

(13)

Examples of CQL

Sliding window join

:

Select *

From S1 [Rows 5], S2 [Rows 10] Where S1.a = S2.a

Streaming aggregates

:

Select Count(*)

From S [Range 1 Minute] Select Count(*)

(14)

11/6/13 14 Yanlei Diao, University of Massachusetts Amherst

Example Windows

Data Stream Data Stream Data Stream

Tumbling: Non-overlapping windows, e.g., 5 second windows <Range 5 seconds Slide 5 seconds>

Sliding: Overlapping windows, e.g., 5 second windows move every 2 seconds < Range 5 seconds Slide 2 seconds>

Landmark: Growing windows, e.g., extend window every 2 seconds

<LANDMARK ADVANCE 2 sec> [Truviso Language]

(15)

Example Windows

Data Stream

Partitioned: value-based sub-streams

e.g. <Partition By attr Rows 2 Slide 2 > partitioned tumbling windows

2

3 4

5 6 7 8 9 10 11 12 13 14 15

1

Red Stream Blue Stream Green Stream

1

2

3

4

5

6

7 8

9

10

11

12

13

14

15

(16)

11/6/13 16

Formal Semantics

v 

At time t, there is a window R_t, run query to obtain

Q(R_t), t=1,2,3…

v 

Collapse the results at adjacent time points based on

identical values

v 

Use Istream and Dstream to ensure that the output also

conforms to the stream model

(17)

Main Issues of Stream Management

v 

Query languages

v 

Single query processing

v 

Multi-query optimization

v 

Adaptive query processing

v 

Real-time processing

v 

Approximate answers

v 

Out of order processing

ü 

Sliding windows

ü 

Non blocking, windowed

operations

ü 

Sharing

ü 

Adaptivity

ü 

Quality of service

ü 

Confidence, sketching,

sampling, wavelets...

ü 

Punctuation, order-agnostic

(18)

11/6/13 18 Yanlei Diao, University of Massachusetts Amherst

Single Query Processing

v 

Blocking

query operator

A query operator that is unable to produce the first tuple of its output until it has seen its entire input.” [BBD+02]

•  Sorting?

•  Sort-merge join?

•  Nested-loops join?

•  Hash join?

v 

Non blocking

query operator

The operator does not stage data (either in memory or on disk) without producing results for a long time.” [UF01]

•  produces the initial result early

(19)

(1) Join: First A Blocking Operator

Assume first that each stream is finite and fits in memory.

Traditional Hash Join blocks when

one

input stalls.

Build

Probe

Source A

Source B

Hash

Table A

(20)

11/6/13 20 Yanlei Diao, University of Massachusetts Amherst

Non-Blocking Join Operator

Build

Probe

Source A

Source B

Hash

Table A

Hash

Table A

Table B

Hash

Symmetric Hash Join

processes tuples as they arrive.

Produces all tuples in the join and no duplicates.

(21)

Non Blocking Join with Limited Memory

v 

SHJ requires both inputs to be memory resident.

Unrealistic in stream processing

v 

XJoin

extends SHJ to work for long (but finite) streams

with limited memory

[UF00]

.

Partition

each input using a hash function.

When allocated memory is used up, flush a partition to disk.

At each point,

a partition = disk-resident data (older) +

memory-resident data (newer)

.

Join processing continues on memory-resident data.

(22)

11/6/13 22 Yanlei Diao, University of Massachusetts Amherst

XJoin: Handling the Partitions

D I S K Tuple B hash(Tuple B) = n SOURCE-B Memory-resident partitions of source B . . . k . . . 1 n fl u sh Disk-resident partitions of source B . . . . . . Disk-resident partitions of source A Memory-resident partitions of source A . . . 1 SOURCE-A M E M O R Y . . . n 1 n 1 k n Tuple A hash(Tuple A) = 1

(23)

Stage 1: Memory-to-Memory Joins

hash(record B) = j Partitions of source B . . . i i M E M O R Y j Partitions of source A j SOURCE-B Tuple B SOURCE-A Tuple A hash(record A) = i . . . build

build probe probe

(24)

11/6/13 24 Yanlei Diao, University of Massachusetts Amherst

Stage 2: Disk-to-Memory Joins

Partitions of source B Partitions of source A M E M O R Y i . . . i i D I S K i Output . . . . . . . . . Partitions of source B Partitions of source A . . . . . . . . . . . .

(25)

Three Stages of XJoin

v

Stage 1 - Symmetric hash join

(memory-to-memory) with partitions

v 

Stage 2- Disk-to-memory

Separate thread - runs when stage 1 blocks (both input stall).

Stage 1 and 2 interleave until all tuples have been received.

v

Stage 3 - Clean up stage

Stage 1 misses pairs that were not in memory concurrently.

Stage 2 misses pairs when both are on disk, and may not get

to run to completion.

Stage 3 joins all the partitions (memory-resident and

(26)

11/6/13 26 Yanlei Diao, University of Massachusetts Amherst

Windowed Joins

v 

Windowed joins

Use windows to obtain a finite set from an infinite stream

Basis is non-blocking symmetric hash join

Add support for windows

−  Window size: in number of tuples or logical time −  Sliding of window: smooth movement or hopping

−  Maintain right state (tuples residing in the window) for the

join; need to prune expired tuples from hash tables!

(27)

(2) Windowed Aggregates

v 

Windowed aggregates:

Use windows to obtain a finite set from an infinite stream

Apply MIN, MAX, SUM, COUNT, or AVG to each window

Avoid

scanning all

tuples in each window for time efficiency

Avoid

storing all

tuples in each window for space efficiency

What is the time/space complexity for each aggr operator?

v 

Windowed SUM/COUNT

Symmetry between expired tuples and new tuples

[SLR96]

v 

Windowed MIN/MAX/TOP-K

A naïve solution may have to store all tuples in each window

How can we do better?

(28)

11/6/13 28

“Predictability” Property of Windows

v 

When a tuple arrives, can predict

1)

to which future windows the tuple will belong

2)

whether the tuple belongs to the partial result (e.g., top-k)

of each window

These insights lead to a

“predicted” top-k result for each of

the future windows

v 

When a new tuple arrives,

it belongs to its own set of current and future windows

it is also used to

update the previously “predicted” top-k

result

for each relevant window

(29)

11/6/13 29 Yanlei Diao, University of Massachusetts Amherst

handling the impact of a new object onew on the current top-k re-sults is straightforward. We can simply compare F(onew) with

F(omin_topk), where omin_topk is the object in the current top-k set with the smallest F score. If F(onew) < F(omin_topk), this

new object will not change the top-k result, otherwise we include it in the top-k result set and remove omin_topk from the top-k set.

However, handling the impact of expired objects on the top-k results is more expensive in terms of resource utilization. Specifi-cally, when existing top-k objects are expired from the window, the incoming new objects may fail to re-fill all the “openings" in the top-k set. In such cases, the state-of-the-art technique [15] needs to recompute the qualified substitutes based on all objects in the window. To do this, two alternative strategies could be adopted: 1) Keep the whole window content sorted by F, and continuously maintain this sorted list with each insertion and expiration of all stream objects. 2) Recompute the top-k from scratch, while using index structure to relieve the computational intensity of recomputa-tion. The state-of-the-art technique [15] adopts the second strategy. No matter which implementation strategy is chosen, this top-k recomputation process clearly constitutes the top-key performance bottleneck in terms of both CPU and memory resource utilization. Computationally, when searching for the substitutional top-k can-didates, it requires us to re-consider potentially huge numbers of objects. More precisely, all objects in the window need to be re-examined if no index structure has been deployed, or expensive index maintenance costs are needed to load and purge every stream object into and from the index . Alternatively, one could maintain a completely sorted list upon the whole window. As windows could potentially be huge in size, this would further increase the cost of the update process.

Memory-wise, as all objects in the window may later be re-considered by the recomputation processes, the recomputation pro-cess requires complete storage of the whole window. Such memory consumption for queries with large window sizes (millions of ob-jects or even more) can be huge, even when a query only asks for a small k, say 10 or 100. To the best of our knowledge, this re-computation bottleneck remains an open research problem, as no existing technique is able to completely eliminate it from the top-k monitoring process.

3.

THEORETICAL FOUNDATIONS

AND AN INITIAL APPROACH

3.1

Minimal Top-k Candidate Set

To solve the recomputation problem, we study the predictabil-ity property of sliding window semantics. In the sliding window scenarios, query windows tend to partially overlap (Q.slide < Q.win) [1, 15, 17]. For example, an analyst may monitor the top 10 significant transactions within the last one hour (Q.win =

3600(s)) with a result refresh rate of every minute (Q.win =

60(s)). Therefore, an object that falls into the window Wi will

also participate in the sequence of the future windows Wi+0, Wi+1

... Wi+n until the end of its life span. Based on our knowledge about objects in the current window and the slide size, we can pre-determine the specific subset of the current objects that will partic-ipate (be alive) in each of the future windows. This enables us to pre-generate the partial query results for future windows, namely the “predicted" top-k result for each of the future windows. They would be based on the objects in the current window but have al-ready taken the expiration of these objects in future windows into consideration. While this overlap property of window semantics

queries, here we apply it for top-k computation.

Figure 1 shows an example of a top-k result for the current win-dow and predicted top-k results for the next three future winwin-dows (Q.win = 16, Q.slide = 4). Each object is depicted as a circle labeled with its object id. The position of an object on the Y-axis indicates its F score, while the position on the X-axis indicates the time it arrived at the system.

Figure 1: (Predicted) Top-k results four consecutive windows at time of W0 (slide size = 4 objects)

Based on the objects in W0, we not only calculate the top-k (k=3)

result in W0, but we also pre-determine the potential top-k results

for the next three future windows, namely W1, W2 and W3, until

the end of life spans of all objects in W0. In particular, the top-k

results for the current window W0 are generated based on all 16

objects in W0, namely objects o1 to o16. While the predicted

top-k results for future windows are calculated based on smaller and smaller subsets of objects in W0, namely the predicted top-k results

for W1, W2 and W3 are calculated based on object o5 to o16, o9

to o16 and o13 to o16 respectively. All other objects belonging to

W0 but determined to not fall into the (predicted) top-k results for

any of these (future) windows (from W0 to W3 in this case) can be

discarded immediately.

In the example shown in Figure 1, only the 7 objects within the predicted top-k results for the current and the three future windows (depicted using circles with solid lines) are kept in our system, while the other 9 objects (depicted using circles with dashed lines) are immediately discarded. The latter are guaranteed to have no chance to become part of top-k result throughout their remaining life spans. On the left of Figure 3, we list the predicted top-k results maintained for these four windows. Although these pre-generated top-k results are only “predictions" based on our current knowl-edge of the objects that have already arrived so far, meaning that they may need to be adjusted (updated) when new objects come in, they guarantee an important property as described below.

Theorem 3.1. At any time, the objects in the predicted top-k

re-sult constitute the “Minimal Top-K candidate set" (MTK), namely

the minimal object set that is both necessary and sufficient for ac-curate top-k monitoring.

The proof for Theorem 3.1 as well as all subsequent Lemmas and Theorems can be found in the appendix.

(30)

11/6/13 30 Yanlei Diao, University of Massachusetts Amherst

Yang et al. An Optimal Strategy for Monitoring Top-K Queries in Streaming Windows. EDBT 2011.

Now we first introduce the first step toward solving this problem, which is based on incremental maintenance of MTK. We call this approach PreTopk. When the window slides, the following two steps update the predicted top-k results. At step 1, we simply purge the view of the expired window. For example, as shown in Figure 3,

the top-k result of W0 in Figure 2 is removed, and W1 becomes the

new current view. This simple operation is sufficient for handling the object expiration. At step 2, we create an empty new predicted top-k result for the newest future window to cover the whole life span of the incoming objects. Using our example in Figure 3, the

newest future window in this case is W4. Therefore, each new

object will fall into the current window and all future windows that we are currently maintaining. We thus update these predicted top-k results by simply applying the addition of each new object.

In particular, when a new object onew comes in, we attempt to

insert it into the predicted top-k result of each window. If the

pre-dicted top-k result of a window has not reached the size of k yet,

we simply insert onew into it. Otherwise we also remove the

exist-ing top-k object with the smallest F score once onew is inserted. If

it fails, namely the predicted top-k result sets of all future windows

maintained have reached size k and F(onew) is no larger than the

F score of any object in them, onew will be discarded immediately.

Again, such computation is straightforward. Figure 2 shows the updated predicted top-k results of our running example (Figure 1) after the insertion of four new objects.

Figure 2: Updated predicted top-k results of four consecutive windows at time of W1 (slide size = 4 objects)

Figure 3: Update process of predicted top-k sets from time of

W0 to W1

3.3

Cost Analysis and Limitations of PreTopk

The predicted top-k result for each future window can be orga-nized using different data structures, such as a sorted list supported

by a tree-based index structure or a min-heap organized on the F

score of the objects. No matter which data structure is chosen, the best possible CPU costs for inserting a new object into a top-k object set and keeping the size of the top-k object set unchanged

has complexity O(log(k)). More precisely, log(k) for

position-ing the new object in the top-k object set, and log(k) for removing

the previous top-k object with the lowest F score. Thus the

over-all processing costs for handling over-all new objects for each window

slide is O(Nnew ∗ Cnw_topk ∗ log(k)), with Nnew the number of

new objects coming to the system at this slide, and Cave_topk 1 the

average number of windows each object is predicted to make top-k when it arrives at the system. As the object expiration process is trivial, this constitutes the total cost for updating the top-k result at each window slide,

Memory-wise, PreTopk maintains predicted top-k results for

Cnw 2 windows. The memory consumption of PreTopk is

com-posed of two parts: first, the number of distinct objects stored in memory; second, the number of references to the objects in the predicted top-k results of all future windows. An object may ap-pear in the predicted top-k results for multiple windows and thus needs multiple references. In the example shown in Figure 4, ob-ject 14 is predicted to be part of the top-k results in four windows, and thus four references to it are needed.

For the first part, PreTopk achieves the minimal number of ob-jects to maintain for continuous top-k monitoring (Theorem 3.1). The size for this minimal set in the average case is analyzed below.

Theorem 3.2. In the average case 3, the number of distinct ob-jects in predicted results for all future windows is 2k.

For the second part, the number of references stored by PreTopk

is simply Cnw ∗ k, as there are Cnw windows and k objects in

each of them. The size of an object reference (Refsize) is typically

significantly smaller than the size of the actual object (Objsize),

especially when the object contains a large number of attributes. In

summary, the average memory cost for PreTopk is 2k ∗ Objsize +

Cnw ∗ k ∗ refsize.

Conclusion. PreTopk solves the recomputation bottleneck suf-fered by the state-of-the-art solutions [15]. Memory-wise, it only keeps the minimal number of objects necessary for top-k query monitoring, which is shown to be independent of the potentially very large window size (in Theorem 3.2). Computation-wise, the processing costs for generating the top-k result in each window are no longer related to the window size. This is a significant improve-ment over the state-of-the-art solution [15], because both the pro-cessing and memory costs of any solution that involves recomputa-tion are related to the window size, which is usually an

overwhelm-ing factor compared to k or the slide size.

However, the limitations of PreTopk are obvious. The above

cost analysis reveals that the performance of the PreTopk

algo-rithm is affected by a constant factor, namely Cnw, the number

of predicted top-k results to be maintained. More precisely, since PreTopk maintains the predicted top-k result for each window in-dependently, both its CPU and memory costs increase linearly with

1When data is uniformly distributed, C

ave_topk = 3slide2k . We

omit the derivation process due to the page limitations

2C

nw = "QQi.win

i.slide# which is equal to the maximum number of

windows a new object can be alive.

3Data is uniformly distributed on F(o), indicating that the objects

with different F scores have equal opportunity to expire after the

References

Related documents

In this part we have to analyze the outbound traffic from the network and try to detect the possible malicious activities that the internal machines are performing. Each host may

Despite relatively high rates of interest in one-to-one cessation support among baseline smokers (42%), only a small proportion (12%) of our cohort who smoked at all during

Keywords: innate lymphocytes, haploidentical hematopoietic stem cell transplantation, immune-reconstitution, natural killer cells, innate lymphoid cells, γδ T cells,

In this paper, closed-form formulae for the Kirchhoff index and resistance distances of the Cayley graphs over finite abelian groups are derived in terms of Laplacian eigenvalues

Unlike most other Australian jurisdictions, in Queensland until 2004 there has been no legal obligation imposed on teachers to report any knowledge or reasonable suspicion that

The contract considered in this paper is the wholesale price contract model, the wholesale price w is determined by the manufacturer and retailer jointly, and under

472 physicochemical property that comprehends the di fferent 473 molecular characteristics of the surface coating: hydrophobic 474 substrates try to replace direct contact with

We performed exome-wide association studies to identify single nucleotide polymorphisms that either influence fasting plasma glucose level or blood hemoglobin A 1c content or