• No results found

to the given statistics. When the average stream arrival rates and the total window sizes are given, the upper bound can be set to ensure even dis- tribution of the tuples in the states. Since the upper bound is calculated using the average arrival rates, it can “smooth” the workload during short term load burst. The correct time-based semantics of the sliding window join continue to be ensured by the tail node since it still uses the time-based state purging.

With count-based workload smoothing, the state slice table in Figure 12.6 has one more column C, denoting the pre-determined count based upper bound for each processing node. Accordingly, the ∆W column is updated at runtime.

Such count-based workload smoothing is effective when the window constraint is large. For a small window to be close to the statistic sampling intervals, the statistics may be imprecise.

14.2 State Relocation

Adding/removing of nodes is needed when system is overloaded or the ring length is not optimal for response time. Two approaches for adaptive optimization are proposed: passive adjustment of the window range and

aggressive adjustment by state relocation.

Passive State Adjustment. When long term load burst happens, pas- sive adjustment aims to relocate the state by setting the window ranges. Consider an example of adding one node to a ring composed of 3 nodes. We assume the states are sliced equally among the processing nodes N1to

14.2. STATE RELOCATION 167 N3(N4finally). That is, the states in each processing node will be changed

from W/3 to W/4, with W denoting the window constraint. The state slices in the original processing nodes N1 to N3 are step-by-step replaced and

shrunk. Finally the new state allocation with one additional processing node is achieved. Similarly, node removal can be conducted.

The graceful state adjustment induces no extra migration cost. How- ever a long adjustment latency may occur for large window size.

Head 2ndNode W/(N+M) W/N … 3rdNode ≥W/(N+M)

New Inserted Node 1

<W/(N+M)

Tail

New Inserted Node M

Figure 14.1: Aggressive State Relocation.

Aggressive State Relocation. To reduce the adaptation latency, aggres- sive state slice adjustment migrates some part of the states along the PSP-D ring. Such state relocation needs to suspend the execution and resume af- terward.

To maintain the ring structure, the state slice movement happens only between adjacent processing nodes. Intuitively, a new processing node should be inserted into the ring at the appropriate position so that the shifted state slice can fill directly the new node. That is, let us assume that the ring has N nodes originally and that another M nodes need to be added into the ring, the i−th processing node from the head node need to move ∆Si = MNi − bMNic state tuples to the next nodes in the direction of the ring towards the tail node, as illustrated below.

14.2. STATE RELOCATION 168 ∆Si = (WN N + MW ) Wi N +M − b(W N W N + M) i W N +M c = M Ni − b M Nic (14.1)

The new processing node Nj, 1 ≤ j ≤ M , needs to be inserted after the processing node Nk, such that k is the minimal number with MNk > j. Figure 14.1 illustrates the addition of a new processing nodes. Similarly, the removal of processing node can be conducted.

The aggressive state relocation involves execution breaks and state mi- gration during the adjustment. Frequent aggressive adjustment should be avoided.

169

Chapter 15

Discussion

15.1 State Replication Based Distribution

To the best of our knowledge, the only work on distributed processing of generic multi-way stream joins is [GYW07]. [GYW07] proposed two state

replication based distributions for generic MJ operators: aligned tuple rout-

ing (ATR) and coordinated tuple routing (CTR). We briefly review these two approaches and then compare them with PSP below.

ATR picks one input stream as the master stream and partitions the master stream among the processing nodes. All the other slave input streams are distributed to the processing nodes with some overlaps of the states, to ensure the semantics of the window constraints. CTR is a multi-hop seman- tics preserving tuple routing where intermediate join results are transferred among nodes during each hop. A weighted minimum set covering is uti- lized to identify the best routing for each tuple to “find” all relevant states. Details of these two approaches can be refereed from [GYW07].

15.1. STATE REPLICATION BASED DISTRIBUTION 170 Memory Cost.

The distribution strategies of both ATR and CTR are based on state (par- tial) duplication among the processing nodes. Compared to them, our pro- posed PSP approach does not have any duplicated states at any time.

In ATR the segment length T is an important parameter for the load diffusion. However, the ATR approach works under the condition that the window constraint W  T . When W is comparable with T , the memory waste and redundant computation can be significant, as illustrated below.

ATR duplicates the states of the slave input streams thus it may use extra memory and CPU to process them. For ease of illustration, we now assume that all input streams have the same arrival rate, and the master stream is not switched during the cost estimation. Besides the notations in Table 13.1, we now introduce T to denote the stream segment length.

For the M − 1 slave streams, each segment is set to be of size T + 2W length, with W as the window size. Assume each segment is assigned to one processing node. Then the total memory consumption of the ATR ap- proach is:

M EMAT R= λT N + λ(T + 2W )(M − 1)N In other words, the duplicated (wasted) state memory is:

M EMAT R− λT N M λT N M = 2 W T M − 1 M

From above equation, we can see that the memory waste is proportional to

W/T . When W  T is not the case, the memory waste can be significant.

15.1. STATE REPLICATION BASED DISTRIBUTION 171

set covering at runtime. CTR faces the following dilemma. The more re- dundant states, the smaller set covering of less nodes may exist. Then the incoming tuples will be stored in fewer states, which may make future set covering large. A more serious concern is that the states in CTR may con- verge to one (or a small subset of) node if sometimes only one copy of the input tuples is stored in a certain node, since future set covering will di- rect all later tuples to that node. Then no distribution is achieved. Unless an optimal insertion algorithm is employed (missing in [GYW07]), which predicts future workload diffusion, the CTR is incontrollable and ad hoc.

The CTR scheme makes L copies (L = 2 in [GYW07]) of the input tuples and allocates them to multiple processing nodes. Thus L copies of each in- put tuple are stored in L different states of the processing nodes. Obviously the memory consumption is also L times of the input stream size.

Synchronization.

ATR results in a set of independent join operators that no synchroniza- tion is needed. However, CTR does need synchronization among nodes in different hops for maintenance of the states and processing of intermediate results. The synchronization is missed in [GYW07].

CPU Cost.

CPU cost comparison is summarized in Figure 15.1. Here we list only the main factors affecting CPU cost.

CTR employs a complex routing algorithm to determine the optimal routes for each segment of input streams. Such routing cost is per segment cost and may be significant with fine-grained segments. On the contrary, ATR and PSP do not require routing by employing one hop computation

15.1. STATE REPLICATION BASED DISTRIBUTION 172

Item ATR CTR PSP

Routing Cost Low High Low

Per Segment Metadata No Yes No

Duplication Removal No Yes No

Load Balancing Granularity Large Small Small

State Management Cost High High Low

Adaptation Cost Unknown Unknown Low

Network Transmission Low Middle High

Figure 15.1: CPU Consumption Comparison

and fixed routing respectively. The routing information and other meta- data must be attached to each segment to ensure the correctness in CTR, while no such requirement exists for ATR and PSP-D. Further CTR needs extra work to avoid generating duplicated results while the other two will not generate duplication in the first place. The ATR and CTR approaches both duplicate states and thus the state management costs are much higher than for PSP. At runtime, each segment of the input stream is processed by only one processing node for ATR, several nodes for CTR and the optimal number of processing nodes for PSP. Thus the processing latency using PSP is expected to be the lowest.

The disadvantage of PSP is that the network transmission cost may be larger than that for ATR and CTR, since all input tuples and intermediate results need to be send along the ring. We limit the usage of the PSP scheme to a cluster with local high speed network only.