Communication Cost in Big
Data Processing
Dan Suciu
University of Washington
Joint work with Paul Beame and Paris Koutris
and the Database Group at the UW
Queries on Big Data
Big Data processing on distributed clusters
•
General programs: MapReduce, Spark
•
SQL: PigLatin,, BigQuery, Shark, Myria…
Traditional Data Processing
•
Main cost = disk I/O
Big Data Processing
•
Main cost = communication
This Talk
•
How much communication is needed to solve
a “problem” on
p
servers?
•
“Problem”:
–
In this talk = Full Conjunctive Query CQ (
≈
SQL)
–
Future work = extend to linear algebra, ML, etc.
•
Some techniques discussed in this talk are
implemented in Myria (Bill Howe’s talk)
Models of Communication
•
Combined communication/computation cost:
–
PRAM
à
MRC [Karloff’2010]
–
BSP [Valiant]
à
LogP Model [Culler]
•
Explicit communication cost
–
[Hong&Kung’81] red/blue pebble games, for I/O
communication complexity
à
[Ballard’2012] extension to parallel algorithms
–
MapReduce model [DasSarma’2013]
MPC [Beame’2013,Beame’2014]
– this talk
Outline
•
The MPC Model
•
Communication Cost for Triangles
•
Communication Cost for CQ’s
•
Discussion
Massively Parallel Communication
Model (MPC)
Extends BSP [Valiant]
Server 1 . . . . Server p
Input (size=M)
Input data
= size
M
bits
Number of servers
=
p
O(M/p) O(M/p)
Massively Parallel Communication
Model (MPC)
Extends BSP [Valiant] Server 1 . . . . Server p Server 1 . . . . Server p Input (size=M) Round 1Input data
= size
M
bits
One round
=
Compute & communicateNumber of servers
=
p
O(M/p) O(M/p)
Massively Parallel Communication
Model (MPC)
Extends BSP [Valiant] Server 1 . . . . Server p Server 1 . . . . Server p Server 1 . . . . Server p Input (size=M) . . . . Round 1 Round 2 Round 3 . . . .Input data
= size
M
bits
Algorithm
= Several rounds
One round
=
Compute & communicateNumber of servers
=
p
O(M/p) O(M/p)
Massively Parallel Communication
Model (MPC)
Extends BSP [Valiant] Server 1 . . . . Server p Server 1 . . . . Server p Server 1 . . . . Server p Input (size=M) . . . . Round 1 Round 2 Round 3 . . . .Input data
= size
M
bits
Max communication load per server
=
L
Algorithm
= Several rounds
One round
=
Compute & communicateNumber of servers
=
p
L L L L L L O(M/p) O(M/p) 9Massively Parallel Communication
Model (MPC)
Extends BSP [Valiant] Server 1 . . . . Server p Server 1 . . . . Server p Server 1 . . . . Server p Input (size=M) . . . . Round 1 Round 2 Round 3 . . . .Input data
= size
M
bits
Max communication load per server
=
L
Ideal:
L
=
M
/
p
This talk:
L
=
M
/
p * p
ε, where
ε
in [0,1] is called
space exponent
Degenerate:
L
=
M
(send everything to one server)
Algorithm
= Several rounds
One round
=
Compute & communicateNumber of servers
=
p
L L L L L L O(M/p) O(M/p) 10Example:
Q(x,y,z) = R(x,y)
⋈
S(y,z)
Server 1 . . . . Server p M/p M/pInput:
•
R
,
S
uniformly
partitioned on
p
servers
size(R)+size(S)=M 11Example:
Q(x,y,z) = R(x,y)
⋈
S(y,z)
Server 1 . . . . Server p Server 1 . . . . Server p Round 1 O(M/p) O(M/p) M/p M/pRound 1: each server
•
Sends record
R(x,y)
to server
h(y) mod p
•
Sends record
S(y,z)
to server
h(y) mod p
Input:
•
R
,
S
uniformly
partitioned on
p
servers
size(R)+size(S)=M
Example:
Q(x,y,z) = R(x,y)
⋈
S(y,z)
Server 1 . . . . Server p Server 1 . . . . Server p Round 1 O(M/p) O(M/p) M/p M/pR(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)
Output:
•
Each server computes and outputs
the local join
R(x,y)
⋈
S(y,z)
Round 1: each server
•
Sends record
R(x,y)
to server
h(y) mod p
•
Sends record
S(y,z)
to server
h(y) mod p
Input:
•
R
,
S
uniformly
partitioned on
p
servers
size(R)+size(S)=M
Example:
Q(x,y,z) = R(x,y)
⋈
S(y,z)
Server 1 . . . . Server p Server 1 . . . . Server p Round 1 O(M/p) O(M/p) M/p M/pR(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)
Output:
•
Each server computes and outputs
the local join
R(x,y)
⋈
S(y,z)
Round 1: each server
•
Sends record
R(x,y)
to server
h(y) mod p
•
Sends record
S(y,z)
to server
h(y) mod p
Input:
•
R
,
S
uniformly
partitioned on
p
servers
Assuming no Skew:
∀
y,
degree
R(y), degree
S(y)
≤
M
/
p
Rounds
=
1
Load
L
= O(
M
/
p
) =
O(
M
/
p * p
0)
size(R)+size(S)=M
Space exponent = 0
Example:
Q(x,y,z) = R(x,y)
⋈
S(y,z)
Server 1 . . . . Server p Server 1 . . . . Server p Round 1 O(M/p) O(M/p) M/p M/pR(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)
Output:
•
Each server computes and outputs
the local join
R(x,y)
⋈
S(y,z)
Round 1: each server
•
Sends record
R(x,y)
to server
h(y) mod p
•
Sends record
S(y,z)
to server
h(y) mod p
Input:
•
R
,
S
uniformly
partitioned on
p
servers
Assuming no Skew:
∀
y,
degree
R(y), degree
S(y)
≤
M
/
p
Rounds
=
1
Load
L
= O(
M
/
p
) =
O(
M
/
p * p
0)
size(R)+size(S)=M Space exponent = 0SQL is
embarrassingly
parallel!
Questions
Fix a query
Q
and a number of servers
p
•
What is the communication load
L
needed
to compute
Q
in
one round
? This talk
based on [Beame’2013, Beame’2014]
•
Fix a maximum load
L
(space exponent
ε
)
How
many rounds
are needed for
Q
?
Preliminary results in [Beame’2013]
Outline
•
The MPC Model
•
Communication Cost for Triangles
•
Communication Cost for CQ’s
•
Discussion
The Triangle Query
•
Input
: three tables
R(X, Y), S(Y, Z), T(Z, X)
size(
R
)+size(
S
)+size(
T
)=
M
•
Output
: compute
Q(x,y,z) = R(x,y)
⋈
S(y,z)
⋈
T(z,x)
Z X Fred Alice Jack Jim Fred Jim Carol Alice … Y Z Fred Alice Jack Jim Fred Jim Carol Alice … X Y Fred Alice Jack Jim Fred Jim Carol Alice … R S T 18
Triangles in Two Rounds
•
Round 1: compute
temp(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)
•
Round 2: compute
Q(X,Y,Z) = temp(X,Y,Z)
⋈
T(Z,X)
Load
L
depends on the size of
temp
. Can
be much larger than
M
/
p
Triangles in One Round
•
Factorize
p
=
p
⅓
×
p
⅓
×
p
⅓
•
Each server identified by (
i
,
j
,
k
)
i j
k
(i,j,k)
P⅓
Place the servers in a cube: Server (i,j,k)
[Ganguli’92, Afrati&Ullman’10, Suri’11, Beame’13]
20
Triangles in One Round
k (i,j,k) P⅓ Z X Fred Alice Jack Jim Fred Jim Carol Alice … Jack Jim Y Z Fred Alice Jack Jim Fred Jim Carol Alice Jim Jack Jim Jack X Y Fred Alice Jack Jim Fred Jim Carol Alice … R S T i = h(Fred) j = h(Jim) Fred Jim Fred Jim Fred Jim Fred Jim Jim Jack Jim Jack Fred Jim Jim Jack Jim Jack Fred Jim Fred JimJack Jack Jim Jim
Round 1:
Send R(x,y) to all servers (h(x),h(y),*) Send S(y,z) to all servers (*, h(y), h(z)) Send T(z,x) to all servers (h(x), *, h(z))
Output:
compute locally R(x,y) ⋈ S(y,z) ⋈ T(z,x)
21
Upper Bound is
L
algo
= O(
M
/
p
⅔
)
22
Theorem
Assume data has no skew: all degree
≤
O(
M
/
p
⅓).
Then the algorithm computes
Q
in one round,
with communication load
L
algo= O(
M
/
p
⅔)
Compare to two rounds:
•
No more large intermediate result
•
Skew-free up to degrees = O(
M
/
p
⅓) (from O(
M
/
p
))
•
BUT: space exponent increased to
ε
=
⅓
(from
ε
= 1
)
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)M
= size(
R
)+size(
S
)+size(
T
) (in bits)
Lower Bound is
L
lower
=
M
/
p
⅔
# without this assumption need to add a multiplicative factor 3
* Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013
M
= size(
R
)+size(
S
)+size(
T
) (in bits)
23
Lower Bound is
L
lower
=
M
/
p
⅔
# without this assumption need to add a multiplicative factor 3
* Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013
M
= size(
R
)+size(
S
)+size(
T
) (in bits)
24
Theorem
* Any one-round deterministic algorithm
A
for
Q
requires
L
lowerbits of communication per server
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)Assume that the three input relations R, S, T are stored on disjoint servers#
Lower Bound is
L
lower
=
M
/
p
⅔
M
= size(
R
)+size(
S
)+size(
T
) (in bits)
25
Theorem
* Any one-round deterministic algorithm
A
for
Q
requires
L
lowerbits of communication per server
We prove a stronger claim, about any algorithm communicating <
L
lowerbits
Let
L
be the algorithm’s max communication load per server
We prove:
if
L
<
L
lowerthen, over random permutations
R, S, T,
the algorithm returns at most a fraction (
L
/
L
lower)
3/2of the answers to
Q
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)# without this assumption need to add a multiplicative factor 3
* Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013
Assume that the three input relations R, S, T are stored on disjoint servers#
Lower Bound is
L
lower
=
M
/
p
⅔
Discussion:
•
An algorithm with space exponent
ε
<
⅓
, has load
L = M/p
1-εand reports only (
L
/
L
lower
)
3/2=
1/p
(1-3ε)/2fraction of all answers. Fewer, as
p
increases!
•
Lower bound holds only for random inputs: cannot
hold for fixed input
R, S, T
since the algorithm may
use a short bit encoding to signal this input to all
servers
•
By Yao’s lemma, the result also holds for
randomized algorithms, some fixed input
26
[Balard’2012] consider Strassen’s matrix multiplication
algorithm
and prove
the following theorem for the
CDAG
model. If each server has
M
c
·
p2n/2lg7memory, then the communication load per server:
IO
(n, p, M
) =
⌦
✓
n
p
M
◆
lg7M
p
!
Thus, if
M
=
p2n/2lg7then
IO
(
n, p
) =
⌦
⇣
p2/n2lg7⌘
Lower Bound is
L
lower
=
M
/
p
⅔
•
Stronger than MPC: no restriction on rounds
•
Weaker than MPC
–
Applies only to an algorithm, not to a problem
–
CDAG model of computation v.s. bit-model
27Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)M here same as n2 here
Lower Bound is
L
lower
=
M
/
p
⅔
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)Proof
28
Lower Bound is
L
lower
=
M
/
p
⅔
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)Proof
E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1
29
Input: size = M = 3 log n!
• R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T
Lower Bound is
L
lower
=
M
/
p
⅔
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)Proof
E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1
30
One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits • Denote aij = Pr [(i,j)∈R and v knows it]
Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) • Denote similarly bjk,cki for S and T
Input: size = M = 3 log n!
• R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T
Lower Bound is
L
lower
=
M
/
p
⅔
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)Proof
E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1
31
One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits • Denote aij = Pr [(i,j)∈R and v knows it]
Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) • Denote similarly bjk,cki for S and T
• E [#answers to Q reported by server v] = Σi,j,k aij bjk cki ≤ [(Σi,j aij2) * (Σ
j,k bjk2) * (Σk,i cki2)]½ [Friedgut]
≤ (1/n) 3/2 * [ L
R(v) * LS(v) * LT(v) / log3 n ]½ by (1), (2)
≤ (1/n) 3/2 * [ L(v) / 3* log n ]3/2 geometric/arithmetic
= [ L(v) / M ]3/2 expression for M above
Input: size = M = 3 log n!
• R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T
Lower Bound is
L
lower
=
M
/
p
⅔
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X)Proof
E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1
E [#answers to Q reported by all servers]
≤ Σv [ L(v) / M ]3/2 = p * [ L / M ] 3/2 = [ L / L
lower ]3/2
32
Input: size = M = 3 log n!
• R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T
One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits • Denote aij = Pr [(i,j)∈R and v knows it]
Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) • Denote similarly bjk,cki for S and T
• E [#answers to Q reported by server v] = Σi,j,k aij bjk cki ≤ [(Σi,j aij2) * (Σ
j,k bjk2) * (Σk,i cki2)]½ [Friedgut]
≤ (1/n) 3/2 * [ L
R(v) * LS(v) * LT(v) / log3 n ]½ by (1), (2)
≤ (1/n) 3/2 * [ L(v) / 3* log n ]3/2 geometric/arithmetic
= [ L(v) / M ]3/2 expression for M above
Outline
•
The MPC Model
•
Communication Cost for Triangles
•
Communication Cost for CQ’s
•
Discussion
General Formula
Consider an arbitrary conjunctive query (CQ):
We will:
•
Give a lower bound
L
lower
formula
•
Give an algorithm with load
L
algo
•
Prove that
L
lower
=
L
algo
34
Q
(
x
1, . . . , x
k) =
S
1(
x
1)
o
n
. . .
o
n
S
`(
x
`)
Cartesian Product
Cartesian Product
S1(x) à
S
2(y)
à
Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2
L
algo
= 2
✓
M
1
·
M
2
p
◆
1 2Algorithm
: factorize
p
=
p
1×
p
2•
Send
S
1(x)
to all servers (
h(x)
,*)
Send
S
2(y)
to all servers (*,
h(y)
)
•
Load =
M
1/
p
1+
M
2/
p
2;
Cartesian Product
37 S1(x) à S 2(y) àQ(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2
L
algo
= 2
✓
M
1
·
M
2
p
◆
1 2L
lower
= 2
✓
M
1
·
M
2
p
◆
1 2Lower bound:
•
Let
L(v) = L
1(
v
) +
L
2(
v
) = load at server
v
•
The
p
servers must report all answers to
Q
:
M
1M
2≤
Σ
vL
1(
v
) *
L
2(
v
)
≤
½
Σ
v(
L
(
v
))
2≤
½
p
max
v(
L
(
v
))
2Algorithm
: factorize
p
=
p
1×
p
2•
Send
S
1(x)
to all servers (
h(x)
,*)
Send
S
2(y)
to all servers (*,
h(y)
)
•
Load =
M
1/
p
1+
M
2/
p
2;
A Simple Observation
Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1,size(S2) = M2, . . . ,size(S`) = M`
Proof similar to cartesian product on previous slide
Definition
An
edge packing
for
Q
is a set of atoms with no common variables:
U
=
{
S
j1(x
j1)
,
S
j2(x
j2)
, . . . ,
S
j|U|(x
j|U|)
}
8
i, k
:
x
ji\
x
jk=
;
Claim
Any one-round algorithm for
Q
must also compute the cartesian product:
Q
0(
x
j1, . . . ,
x
j|U|) =
S
j1(
x
j1)
o
n
S
j2(
x
j2)
o
n
. . .
o
n
S
j|U|(
x
j|U|)
Proof
Because the algorithm doesn’t know the other
S
j’s.
L
lower=
|
U
|
·
✓
M
j1·
M
j2· · ·
M
j|U|p
◆
1 |U|Assume that the three input relations
S
1, S
2, …
are stored on disjoint servers
#The Lower Bound for CQ
39
Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1,size(S2) = M2, . . . ,size(S`) = M`
Theorem
* Any one-round deterministic algorithm
A
for
Q
requires O(
L
lower) bits of communication per server:
L
lower
=
✓
M
1
u
1·
M
2
u
2· · ·
M
`
u
`p
◆
1 u1+u2+···+u`*Beame, Koutris, Suciu, Skew in Parallel Data Processing, PODS 2014
Note that we ignore constant factors (like |U| on the previous slide)
Definition
A
fractional edge packing
are real numbers
u
1, u
2, . . . , u
`s.t.:
8
j
2
[
`
] :
u
j0
8
i
2
[
k
] :
X
j2[`]:xi2vars(Sj(xj))
Triangle Query Revisited
40
Q(X,Y,Z) = R(X,Y)
⋈
S(Y,Z)⋈
T(Z,X) size(R) = M1, size(S) = M2, size(T) = M3Packing
u
1,
u
2,
u
3L
= (
M
1u1*
M
2u2*
M
3u3/
p
)
1/(u1+u2+u3)½
,
½
,
½
(
M
1M
2M
3)
⅓/
p
⅔1
,
0
,
0
M
1/
p
0
,
1
,
0
M
2/
p
0
,
0
,
1
M
3/
p
L
lower= the largest of these four values
When
M
1=
M
2=
M
3=
M
, then the largest value is the first:
M
/
p
⅔x y
z
½
½ ½
An Algorithm for CQ
[Afrati&Ullman’2010]
Factorize
p
=
p
1×
p
2×
…
×
p
k;
a server is (
v
1, …,
v
k)
∈
[
p
1]
×
…
×
[
p
k]
•
Send
S
1(x
11, x
12, …)
to servers with coordinates
h(x
11)
,
h(x
12)
, …
•
Send
S
2(x
21, x
22, …)
to servers with coordinates
h(x
21)
,
h(x
22)
, …
•
…
•
Compute the query locally at each server
•
Load is O(
L
algo), where:
Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1,size(S2) = M2, . . . ,size(S`) = M`
[A&U] use sum instead of max. With max we can compute the optimal p1, p2, …, pk
L
algo
= max
j
2
[
`
]
M
j
Q
L
lower
=
L
algo
Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1,size(S2) = M2, . . . ,size(S`) = M`
L
lower=
✓ Q
jM
jujp
◆
P 1 j ujApply log in base p. Denote µj = logp Mj, ei = logp pi
L
algo
=
max
j
Q
M
j
L
lower
=
L
algo
Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1,size(S2) = M2, . . . ,size(S`) = M`
L
lower=
✓ Q
jM
jujp
◆
P 1 j ujApply log in base p. Denote µj = logp Mj, ei = logp pi
minimize
X
ie
i
1
8
j
:
+
X
j:xi2vars(Sj)e
iµ
jL
algo
=
max
j
Q
M
j
i
:
x
i2
vars
(
S
j)
p
i
L
lower
=
L
algo
Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1,size(S2) = M2, . . . ,size(S`) = M`
L
lower=
✓ Q
jM
jujp
◆
P 1 j ujApply log in base p. Denote µj = logp Mj, ei = logp pi