• No results found

Distributed Learning. Amir H. Payberah 10/12/2019

N/A
N/A
Protected

Academic year: 2021

Share "Distributed Learning. Amir H. Payberah 10/12/2019"

Copied!
178
0
0

Loading.... (view fulltext now)

Full text

(1)

Distributed Learning

Amir H. Payberah [email protected]

(2)

The Course Web Page

https://id2223kth.github.io

(3)
(4)

Where Are We?

(5)
(6)

[https://www.tripsavvy.com/how-to-get-from-copenhagen-to-stockholm-1626275]

(7)
(8)

Ferrari or Truck?

I Pick up your partner?

I Moving the furniture?

(9)

Ferrari or Truck?

I Pick up your partner?

(10)

Ferrari or Truck?

I Pick up your partner?

I Moving the furniture?

(11)

Ferrari or Truck?

I Pick up your partner?

(12)

CPU vs GPU

(13)
(14)
(15)
(16)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Trainingthe model

Storingthe trained model

Deploymentof the model

(17)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Trainingthe model

Storingthe trained model

(18)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Trainingthe model

Storingthe trained model

Deploymentof the model

(19)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Trainingthe model

Storingthe trained model

(20)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Trainingthe model

Storingthe trained model

Deploymentof the model

(21)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Trainingthe model

Storingthe trained model

(22)

I Whichcomponents of a DNN would requireintense hardware resource?

I A few candidates are:

Preprocessinginput data

Training the model

Storingthe trained model

Deploymentof the model

(23)

Training a Model

I Forward pass: input is passed through the DNN and an output is generated.

I Backward pass: weights are updated on the basis of error we get in forward pass.

(24)

Training a Model

I Forward pass: input is passed through the DNN and an output is generated.

I Backward pass: weights are updated on the basis of error we get in forward pass.

I Both of these operations are essentiallymatrix multiplications.

(25)

How to Train a Model Faster?

I The computationally intensive part of neural network is made up of multiplematrix multiplications.

I How can we make it faster?

I Do these operationsat the same time, instead of doing itone after the other.

I This is in a nutshell why we use GPUinstead of a CPU for training a neural network.

(26)

How to Train a Model Faster?

I The computationally intensive part of neural network is made up of multiplematrix multiplications.

I How can we make it faster?

I Do these operationsat the same time, instead of doing itone after the other.

I This is in a nutshell why we use GPUinstead of a CPU for training a neural network.

(27)

How to Train a Model Faster?

I The computationally intensive part of neural network is made up of multiplematrix multiplications.

I How can we make it faster?

I Do these operationsat the same time, instead of doing itone after the other.

I This is in a nutshell why we use GPUinstead of a CPU fortraining a neural network.

(28)

Placing Operations and Variables on Devices (1/4)

I For now, lets asume to run everything on a single machine.

(29)

Placing Operations and Variables on Devices (2/4)

I Place thedata preprocessingoperations on CPUs, and theNN operations onGPUs.

I Adding moreCPU RAM to a machine is simple and cheap, whereas the GPU RAM

is an expensive and limited resource.

If a variable is not needed in the next few training steps, it should probably be placed

on the CPU (e.g.,datasets generally belong on the CPU).

I GPUs usually have a fairly limited communication bandwidth, so it is important to avoid unnecessary data transfers in and out of the GPUs.

(30)

Placing Operations and Variables on Devices (2/4)

I Place thedata preprocessingoperations on CPUs, and theNN operations onGPUs.

I Adding moreCPU RAM to a machine is simple and cheap, whereas the GPU RAM

is an expensive and limited resource.

If a variable is not needed in the next few training steps, it should probably be placed

on the CPU (e.g.,datasets generally belong on the CPU).

I GPUs usually have a fairly limited communication bandwidth, so it is important to avoid unnecessary data transfers in and out of the GPUs.

(31)

Placing Operations and Variables on Devices (2/4)

I Place thedata preprocessingoperations on CPUs, and theNN operations onGPUs.

I Adding moreCPU RAM to a machine is simple and cheap, whereas the GPU RAM

is an expensive and limited resource.

If a variable is not needed in the next few training steps, it should probably be placed

on the CPU (e.g.,datasets generally belong on the CPU).

I GPUs usually have a fairly limited communication bandwidth, so it is important to avoid unnecessary data transfers in and out of the GPUs.

(32)

Placing Operations and Variables on Devices (2/4)

I Place thedata preprocessingoperations on CPUs, and theNN operations onGPUs.

I Adding moreCPU RAM to a machine is simple and cheap, whereas the GPU RAM

is an expensive and limited resource.

If a variable is not needed in the next few training steps, it should probably be placed

on the CPU (e.g.,datasets generally belong on the CPU).

I GPUs usually have a fairly limited communication bandwidth, so it is important to avoid unnecessary data transfers in and out of the GPUs.

(33)

Placing Operations and Variables on Devices (3/4)

I By default, all variables/operationsare placed on the first GPU:/gpu:0.

I Variables/operations thatdo not have a GPU kernelare placed on theCPU:/cpu:0.

I A kernel is a variable or operation’s implementation for a specific data and device type.

For example, there is a GPU kernel for thefloat32 tf.matmul()operation, but there

(34)

Placing Operations and Variables on Devices (3/4)

I By default, all variables/operationsare placed on the first GPU:/gpu:0.

I Variables/operations thatdo not have a GPU kernelare placed on theCPU:/cpu:0.

I A kernel is a variable or operation’s implementation for a specific data and device type.

For example, there is a GPU kernel for thefloat32 tf.matmul()operation, but there

is no GPU kernel forint32 tf.matmul() (only a CPU kernel).

(35)

Placing Operations and Variables on Devices (3/4)

I By default, all variables/operationsare placed on the first GPU:/gpu:0.

I Variables/operations thatdo not have a GPU kernelare placed on theCPU:/cpu:0.

I A kernel is a variable or operation’s implementation for a specific data and device type.

For example, there is a GPU kernel for thefloat32 tf.matmul()operation, but there

(36)

Placing Operations and Variables on Devices (3/4)

I By default, all variables/operationsare placed on the first GPU:/gpu:0.

I Variables/operations thatdo not have a GPU kernelare placed on theCPU:/cpu:0.

I A kernel is a variable or operation’s implementation for a specific data and device type.

For example, there is a GPU kernel for thefloat32 tf.matmul()operation, but there

is no GPU kernel forint32 tf.matmul() (only a CPU kernel).

(37)

Placing Operations and Variables on Devices (4/4)

I TensorFlow automatically decides which device to execute an operation and copies

tensors to thatdevice.

I However, TensorFlow operations can beexplicitlyplaced on specific devices using the

(38)

Manual Device Placement (1/3)

I Usewith tf.device to create a device context.

I All theoperations within that context will run on the same designated device.

tf.debugging.set_log_device_placement(True)

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b)

print(c) Output:

Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0 tf.Tensor(

[[22. 28.]

[49. 64.]], shape=(2, 2), dtype=float32)

(39)

Manual Device Placement (1/3)

I Usewith tf.device to create a device context.

I All theoperations within that context will run on the same designated device.

tf.debugging.set_log_device_placement(True)

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b)

print(c)

Output:

Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0 tf.Tensor(

[[22. 28.]

(40)

Manual Device Placement (1/3)

I Usewith tf.device to create a device context.

I All theoperations within that context will run on the same designated device.

tf.debugging.set_log_device_placement(True)

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b)

print(c) Output:

Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0 tf.Tensor(

[[22. 28.]

[49. 64.]], shape=(2, 2), dtype=float32)

(41)

Manual Device Placement (2/3)

tf.debugging.set_log_device_placement(True)

with tf.device(’/cpu:0’):

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

c = tf.matmul(a, b)

print(c)

Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0 tf.Tensor(

[[22. 28.]

[49. 64.]], shape=(2, 2), dtype=float32)

I Here, aandbare assigned to CPU:0.

I Since a device was not explicitly specified for thematmuloperation, it will be run on the default device GPU:0.

(42)

Manual Device Placement (2/3)

tf.debugging.set_log_device_placement(True)

with tf.device(’/cpu:0’):

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

c = tf.matmul(a, b)

print(c)

Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0 tf.Tensor(

[[22. 28.]

[49. 64.]], shape=(2, 2), dtype=float32)

I Here, aandbare assigned to CPU:0.

I Since a device was not explicitly specified for thematmuloperation, it will be run on the default device GPU:0.

(43)

Manual Device Placement (2/3)

tf.debugging.set_log_device_placement(True)

with tf.device(’/cpu:0’):

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

c = tf.matmul(a, b)

print(c)

Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0 tf.Tensor(

[[22. 28.]

[49. 64.]], shape=(2, 2), dtype=float32)

I Here, aandbare assigned to CPU:0.

(44)

Manual Device Placement (3/3)

tf.debugging.set_log_device_placement(True)

with tf.device(’/cpu:0’):

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b)

print(c)

Executing op MatMul in device /job:localhost/replica:0/task:0/device:CPU:0 tf.Tensor(

[[22. 28.]

[49. 64.]], shape=(2, 2), dtype=float32)

(45)

Manual Device Placement (3/3)

tf.debugging.set_log_device_placement(True)

with tf.device(’/cpu:0’):

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b)

print(c)

Executing op MatMul in device /job:localhost/replica:0/task:0/device:CPU:0 tf.Tensor(

[[22. 28.]

(46)

Parallel Execution Across Multiple Devices

(47)

Parallelization

I Trainlarge deep learning models withhuge amounts of training data.

I Parallelization anddistribution are essential.

I Two main approaches to training a single model across multiple devices: • Modelparallelization

(48)

Parallelization

I Trainlarge deep learning models withhuge amounts of training data.

I Parallelization anddistribution are essential.

I Two main approaches to training a single model across multiple devices: • Modelparallelization

Dataparallelization

(49)

Parallelization

I Trainlarge deep learning models withhuge amounts of training data.

I Parallelization anddistribution are essential.

I Two main approaches to training a single model across multiple devices: • Modelparallelization

(50)

Model Parallelization

(51)

Model Parallelization

I Themodel is split across multiple devices.

(52)

Model Parallelization

I Themodel is split across multiple devices.

I Depends on thearchitecture of the NN.

[Mayer, R. et al., arXiv:1903.11314, 2019]

(53)

Fully Connetected Model Parallelization (1/2)

I To place each layeron a different device.

I Not good: each layer needs towait for the output of theprevious layerbefore it can do anything.

(54)

Fully Connetected Model Parallelization (1/2)

I To place each layeron a different device.

I Not good: each layer needs towait for the output of theprevious layerbefore it can do anything.

(55)

Fully Connetected Model Parallelization (2/2)

I Slice the model vertically.

E.g., theleft half of each layer on one device, and theright parton another device.

I Slightly better: both halves of each layer can indeed workin parallel.

I Each half of the next layer requires the output of both halves: lot of cross-device communication.

(56)

Fully Connetected Model Parallelization (2/2)

I Slice the model vertically.

E.g., theleft half of each layer on one device, and theright parton another device.

I Slightly better: both halves of each layer can indeed workin parallel.

I Each half of the next layer requires the output of both halves: lot of cross-device communication.

(57)

Fully Connetected Model Parallelization (2/2)

I Slice the model vertically.

E.g., theleft half of each layer on one device, and theright parton another device.

I Slightly better: both halves of each layer can indeed workin parallel.

I Each half of the next layer requires the output of both halves: lot of cross-device communication.

(58)

CNN Model Parallelization

I Some NN, such asCNN, contains layers that are onlypartially connectedto thelower layers.

I Easierto distribute the model across devices in an efficient way.

(59)

CNN Model Parallelization

I Some NN, such asCNN, contains layers that are onlypartially connectedto thelower layers.

(60)

RNN Model Parallelization

I Split the NN horizontallyby placingeach layeron a different device.

I At the first step, only one device will be active.

I At the second step, two will be active.

I While the first layerwill be handling the second value, thesecond layerwill be handling the output of the first layer for the first value.

I By the time the signal propagates to the

output layer, all devices will be active simultaneously.

(61)

RNN Model Parallelization

I Split the NN horizontallyby placingeach layeron a different device.

I At the first step, only one device will be active.

I At the second step, two will be active.

I While the first layerwill be handling the second value, thesecond layerwill be handling the output of the first layer for the first value.

I By the time the signal propagates to the

output layer, all devices will be active simultaneously.

(62)

RNN Model Parallelization

I Split the NN horizontallyby placingeach layeron a different device.

I At the first step, only one device will be active.

I At the second step, two will be active.

I While the first layerwill be handling the second value, thesecond layerwill be handling the output of the first layer for the first value.

I By the time the signal propagates to the

output layer, all devices will be active simultaneously.

(63)

RNN Model Parallelization

I Split the NN horizontallyby placingeach layeron a different device.

I At the first step, only one device will be active.

I At the second step, two will be active.

I While the first layerwill be handling the second value, thesecond layerwill be handling the output of the first layer for the first value.

I By the time the signal propagates to the

output layer, all devices will be active simultaneously.

(64)

RNN Model Parallelization

I Split the NN horizontallyby placingeach layeron a different device.

I At the first step, only one device will be active.

I At the second step, two will be active.

I While the first layerwill be handling the second value, thesecond layerwill be handling the output of the first layer for the first value.

I By the time the signal propagates to the

output layer, all devices will be active simultaneously.

(65)
(66)

Data Parallelization (1/2)

I Replicate awhole model onevery device.

I Trainall replicassimultaneously, using adifferent mini-batchfor each.

[Mayer, R. et al., arXiv:1903.11314, 2019]

(67)

Data Parallelization (2/2)

1. Compute thegradient of theloss functionusing amini-batch oneach GPU.

2. Compute themean of the gradients byinter-GPU communication.

(68)

Data Parallelization (2/2)

1. Compute thegradient of theloss functionusing amini-batch oneach GPU.

2. Compute themean of the gradients byinter-GPU communication.

3. Updatethemodel.

(69)

Data Parallelization (2/2)

1. Compute thegradient of theloss functionusing amini-batch oneach GPU.

2. Compute themean of the gradients byinter-GPU communication.

(70)

Data Parallelization Design Issues

I System Architecture: how to synchronize the parameters

I Synchronization: when to synchronize the parameters

(71)

Data Parallelization Design Issues

I System Architecture: how to synchronize the parameters

(72)

System Architecture

(73)

System Architecture - Centralized

I How to aggregate gradients(compute themeanof the gradients)?

(74)

System Architecture - Centralized

I Store the model parametersoutside of the workers.

I Workers periodically report their computed parameters or parameter updates to a (set of) parameter server(s) (PSs).

(75)

System Architecture - Centralized

I Store the model parametersoutside of the workers.

I Workers periodically report their computed parameters or parameter updates to a (set of) parameter server(s) (PSs).

(76)

System Architecture - Centralized

I Store the model parametersoutside of the workers.

I Workers periodically report their computed parameters or parameter updates to a (set of) parameter server(s) (PSs).

(77)

System Architecture - Decentralized

I Mirrorall the modelparametersacross all workers (No PS).

(78)

System Architecture - Decentralized

I Mirrorall the modelparametersacross all workers (No PS).

I Workers exchangeparameter updates directlyvia an allreduceoperation.

(79)

Reduce and AllReduce (1/2)

I Reduce: reducing a set of numbersinto asmaller set of numbersvia a function.

I E.g.,sum([1, 2, 3, 4, 5]) = 15

I Reduce takes an array of input elements on each process and returns an array of output elements to theroot process.

(80)

Reduce and AllReduce (1/2)

I Reduce: reducing a set of numbersinto asmaller set of numbersvia a function.

I E.g.,sum([1, 2, 3, 4, 5]) = 15

I Reduce takes an array of input elements on each process and returns an array of output elements to theroot process.

[https://mpitutorial.com/tutorials/mpi-reduce-and-allreduce]

(81)

Reduce and AllReduce (1/2)

I Reduce: reducing a set of numbersinto asmaller set of numbersvia a function.

I E.g.,sum([1, 2, 3, 4, 5]) = 15

I Reduce takes an array of input elements on each process and returns an array of output elements to theroot process.

(82)

Reduce and AllReduce (1/2)

I Reduce: reducing a set of numbersinto asmaller set of numbersvia a function.

I E.g.,sum([1, 2, 3, 4, 5]) = 15

I Reduce takes an array of input elements on each process and returns an array of output elements to theroot process.

[https://mpitutorial.com/tutorials/mpi-reduce-and-allreduce]

(83)

Reduce and AllReduce (2/2)

I AllReduce storesreduced results acrossall processes rather than the root process.

(84)

Reduce and AllReduce (2/2)

I AllReduce storesreduced results acrossall processes rather than the root process.

[https://mpitutorial.com/tutorials/mpi-reduce-and-allreduce]

(85)

AllReduce Example

Initial state After AllReduce operation

(86)

AllReduce Implementation

I All-to-all allreduce I Master-worker allreduce I Tree allreduce I Round-robin allreduce I Butterfly allreduce I Ring allreduce 42 / 81
(87)

AllReduce Implementation - All-to-All AllReduce

I Send the array of data to each other.

I Apply the reduction operation oneach process.

(88)

AllReduce Implementation - All-to-All AllReduce

I Send the array of data to each other.

I Apply the reduction operation oneach process.

I Too many unnecessary messages.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(89)

AllReduce Implementation - Master-Worker AllReduce

I Selecting one processas a master, gather all arrays into the master.

I Perform reduction operationslocally in themaster.

I Distribute the result to the other processes.

(90)

AllReduce Implementation - Master-Worker AllReduce

I Selecting one processas a master, gather all arrays into the master.

I Perform reduction operationslocally in themaster.

I Distribute the result to the other processes.

I The master becomes a bottleneck(not scalable).

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(91)

AllReduce Implementation - Other implementations

I Some try tominimize bandwidth.

(92)

AllReduce Implementation - Ring-AllReduce (1/6)

I TheRing-Allreduce hastwo phases:

1. First, theshare-reducephase

2. Then, theshare-onlyphase

(93)

AllReduce Implementation - Ring-AllReduce (2/6)

I In the share-reducephase, each processpsends data to the process (p+1) % m

mis the number of processes, and%is the modulo operator.

I The array of dataon each process is divided to mchunks (m=4 here).

(94)

AllReduce Implementation - Ring-AllReduce (2/6)

I In the share-reducephase, each processpsends data to the process (p+1) % m

mis the number of processes, and%is the modulo operator.

I Thearray of data on each process is divided tom chunks (m=4here).

I Each one of these chunkswill beindexed byi going forward.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(95)

AllReduce Implementation - Ring-AllReduce (2/6)

I In the share-reducephase, each processpsends data to the process (p+1) % m

mis the number of processes, and%is the modulo operator.

I Thearray of data on each process is divided tom chunks (m=4here).

(96)

AllReduce Implementation - Ring-AllReduce (3/6)

I In the first share-reduce step, process Asendsa0 to processB.

I ProcessBsends b1 to processC, etc.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(97)

AllReduce Implementation - Ring-AllReduce (3/6)

I In the first share-reduce step, process Asendsa0 to processB. I ProcessBsends b1 to processC, etc.

(98)

AllReduce Implementation - Ring-AllReduce (4/6)

I When each processreceives the datafrom theprevious process, it applies thereduce operator (e.g., sum or mean)

Thereduce operatorshould beassociativeandcommutative.

I It then proceeds tosend it to the next process in the ring.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(99)

AllReduce Implementation - Ring-AllReduce (4/6)

I When each processreceives the datafrom theprevious process, it applies thereduce operator (e.g., sum or mean)

Thereduce operatorshould beassociativeandcommutative.

(100)

AllReduce Implementation - Ring-AllReduce (4/6)

I When each processreceives the datafrom theprevious process, it applies thereduce operator (e.g., sum or mean)

Thereduce operatorshould beassociativeandcommutative.

I It then proceeds tosend it to the next process in the ring.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(101)

AllReduce Implementation - Ring-AllReduce (5/6)

I The share-reducephase finishes when each process holds the complete reduction of

chunk i.

(102)

AllReduce Implementation - Ring-AllReduce (5/6)

I The share-reducephase finishes when each process holds the complete reduction of

chunk i.

I At this point each processholds a part of theend result.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(103)

AllReduce Implementation - Ring-AllReduce (6/6)

I The share-only step is the same process of sharing the data in a ring-like fashion without applying the reduce operation.

(104)

AllReduce Implementation - Ring-AllReduce (6/6)

I The share-only step is the same process of sharing the data in a ring-like fashion without applying the reduce operation.

I Thisconsolidates theresult of each chunk in every process.

[https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da]

(105)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

(106)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

m× (m − 1)).

(107)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

(108)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.

Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

m× (m − 1)).

(109)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

(110)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

m× (m − 1)).

(111)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages. • Total network traffic is2(N

(112)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages.

Total network traffic is2(N

m× (m − 1)).

(113)

Master-Worker AllReduce vs. Ring-AllReduce

I N: number of elements,m: number of processes

I Master-Worker AllReduce

First eachprocesssendsNelements to themaster: N × (m − 1)messages.

Then themastersends the results back to theprocess: anotherN × (m − 1)messages.Total network traffic is2(N × (m − 1)), which isproportionaltom.

I Ring-AllReduce

In the share-reduce step each process sends N

m elements, and it does it m − 1 times: N

m× (m − 1)messages.

On theshare-onlystep, eachprocesssends the result for the chunk it calculated: another N

m× (m − 1)messages.

(114)

Synchronization

(115)

Synchronization

(116)

Synchronization - Synchronous

I After each iteration (processing of a mini-batch), the workers synchronizetheir pa-rameter updates.

Easyto reason about themodel convergence.

The training process prone to the straggler problem, where the slowest worker slows down all the others.

[Mayer, R. et al., arXiv:1903.11314, 2019]

(117)

Synchronization - Synchronous

I After each iteration (processing of a mini-batch), the workers synchronizetheir pa-rameter updates.

Easyto reason about themodel convergence.

The training process prone to the straggler problem, where the slowest worker slows down all the others.

(118)

Synchronization - Synchronous

I After each iteration (processing of a mini-batch), the workers synchronizetheir pa-rameter updates.

Easyto reason about themodel convergence.

The training process prone to the straggler problem, where the slowest worker slows down all the others.

[Mayer, R. et al., arXiv:1903.11314, 2019]

(119)

Synchronization - Asynchronous

I Workers update their model independently from each other.

Aworkermay train onstale (delayed)parameters.

This makes ithard to mathematically reason about themodel convergence.

It provides the workersflexibilityin their training process, completely avoiding all strag-gler problems.

(120)

Synchronization - Asynchronous

I Workers update their model independently from each other.

Aworkermay train onstale (delayed)parameters.

This makes ithard to mathematically reason about themodel convergence.

It provides the workersflexibilityin their training process, completely avoiding all strag-gler problems.

[Mayer, R. et al., arXiv:1903.11314, 2019]

(121)

Synchronization - Asynchronous

I Workers update their model independently from each other.

Aworkermay train onstale (delayed)parameters.

This makes ithard to mathematically reason about themodel convergence.

It provides the workersflexibilityin their training process, completely avoiding all strag-gler problems.

(122)

Synchronization - Asynchronous

I Workers update their model independently from each other.

Aworkermay train onstale (delayed)parameters.

This makes ithard to mathematically reason about themodel convergence.

It provides the workersflexibilityin their training process, completely avoiding all strag-gler problems.

[Mayer, R. et al., arXiv:1903.11314, 2019]

(123)
(124)

TensorFlow Distribution Strategies

I tf.distribute.Strategy is a TensorFlow API todistribute training.

I Supports both parameter server andallreducemodels.

(125)
(126)

Single Server Training - MirroredStrategy (1/2)

I Synchronous distribute training training on multiple GPUs on one machine.

I One replica per GPU.

I The parametersof the model aremirroredacross all the replicas.

I These parametersare kept in syncwith each other by applying identical updates.

I The parameters updatesare communicated usingallreduce algorithms.

mirrored_strategy = tf.distribute.MirroredStrategy()

# to use only some of the GPUs on your machine

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

(127)

Single Server Training - MirroredStrategy (1/2)

I Synchronous distribute training training on multiple GPUs on one machine.

I One replica per GPU.

I The parametersof the model aremirroredacross all the replicas.

I These parametersare kept in syncwith each other by applying identical updates.

I The parameters updatesare communicated usingallreduce algorithms.

mirrored_strategy = tf.distribute.MirroredStrategy()

# to use only some of the GPUs on your machine

(128)

Single Server Training - MirroredStrategy (1/2)

I Synchronous distribute training training on multiple GPUs on one machine.

I One replica per GPU.

I Theparameters of the model aremirroredacross all the replicas.

I These parametersare kept in syncwith each other by applying identical updates.

I The parameters updatesare communicated usingallreduce algorithms.

mirrored_strategy = tf.distribute.MirroredStrategy()

# to use only some of the GPUs on your machine

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

(129)

Single Server Training - MirroredStrategy (1/2)

I Synchronous distribute training training on multiple GPUs on one machine.

I One replica per GPU.

I Theparameters of the model aremirroredacross all the replicas.

I These parametersare kept in syncwith each other by applying identical updates.

I The parameters updatesare communicated usingallreduce algorithms.

mirrored_strategy = tf.distribute.MirroredStrategy()

# to use only some of the GPUs on your machine

(130)

Single Server Training - MirroredStrategy (1/2)

I Synchronous distribute training training on multiple GPUs on one machine.

I One replica per GPU.

I Theparameters of the model aremirroredacross all the replicas.

I These parametersare kept in syncwith each other by applying identical updates.

I The parameters updatesare communicated usingallreduce algorithms.

mirrored_strategy = tf.distribute.MirroredStrategy()

# to use only some of the GPUs on your machine

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

(131)

Single Server Training - MirroredStrategy (2/2)

I There are different implementation of allreduce.

I You can overridethe cross GPU communication: • tf.distribute.NcclAllReduce(the default)tf.distribute.ReductionToOneDevicetf.distribute.HierarchicalCopyAllReduce

mirrored_strategy = tf.distribute.MirroredStrategy(

(132)

Single Server Training - CentralStorageStrategy

I Parameters are not mirrored, instead they are placed on the CPU.

I Operations are replicated across all localGPUs.

I Does synchronous training.

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

(133)

Single Server Training - CentralStorageStrategy

I Parameters are not mirrored, instead they are placed on the CPU.

I Operations are replicated across all localGPUs.

I Does synchronous training.

(134)

Single Server Training - CentralStorageStrategy

I Parameters are not mirrored, instead they are placed on the CPU.

I Operations are replicated across all localGPUs.

I Does synchronous training.

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

(135)

Single Server Trainings - Example

I Creat a strategy, e.g.,MirroredStrategy or CentralStorageStrategy.

I Call itsscope() method to get a distribution context.

I Wrap the creation andcompilationof the model inside that context.

I Call the model’sfit() andpredict() method normally (outside the context).

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():

model = keras.models.Sequential([...]) model.compile(...)

model.fit(...) model.predict(...)

(136)

Single Server Trainings - Example

I Creat a strategy, e.g.,MirroredStrategy or CentralStorageStrategy.

I Call itsscope() method to get a distribution context.

I Wrap the creation andcompilationof the model inside that context.

I Call the model’sfit() andpredict() method normally (outside the context).

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():

model = keras.models.Sequential([...]) model.compile(...)

model.fit(...) model.predict(...)

(137)

Single Server Trainings - Example

I Creat a strategy, e.g.,MirroredStrategy or CentralStorageStrategy.

I Call itsscope() method to get a distribution context.

I Wrap the creation andcompilationof the model inside that context.

I Call the model’sfit() andpredict() method normally (outside the context).

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():

model = keras.models.Sequential([...]) model.compile(...)

model.fit(...) model.predict(...)

(138)

Single Server Trainings - Example

I Creat a strategy, e.g.,MirroredStrategy or CentralStorageStrategy.

I Call itsscope() method to get a distribution context.

I Wrap the creation andcompilationof the model inside that context.

I Call the model’sfit() andpredict() method normally (outside the context).

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():

model = keras.models.Sequential([...]) model.compile(...)

model.fit(...) model.predict(...)

(139)
(140)

Multi Servers Trainings - MultiWorkerMirroredStrategy (1/2)

I Verysimilar toMirroredStrategy.

I Synchronous distributed training across multiple workers, each with potentially mul-tiple GPUs.

I Makes copies of all parametersof the model oneach device across all workers.

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

(141)

Multi Servers Trainings - MultiWorkerMirroredStrategy (1/2)

I Verysimilar toMirroredStrategy.

I Synchronous distributed training across multiple workers, each with potentially mul-tiple GPUs.

I Makes copies of all parametersof the model oneach device across all workers.

(142)

Multi Servers Trainings - MultiWorkerMirroredStrategy (1/2)

I Verysimilar toMirroredStrategy.

I Synchronous distributed training across multiple workers, each with potentially mul-tiple GPUs.

I Makes copies of allparameters of the model oneach device across all workers.

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

(143)

Multi Servers Trainings - MultiWorkerMirroredStrategy (2/2)

I Two different implementations:

CollectiveCommunication.RING(ring-based implementation)CollectiveCommunication.NCCL(Nvidia’s NCCL implementation)

I CollectiveCommunication.AUTOdefers the choice to the runtime.

I Thebest choice of collective implementation depends upon thenumber and kind of GPUs, and thenetwork interconnectin the cluster.

# ring-based collectives

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( tf.distribute.experimental.CollectiveCommunication.RING)

(144)

Multi Servers Trainings - MultiWorkerMirroredStrategy (2/2)

I Two different implementations:

CollectiveCommunication.RING(ring-based implementation)CollectiveCommunication.NCCL(Nvidia’s NCCL implementation)

I CollectiveCommunication.AUTOdefers the choice to the runtime.

I Thebest choice of collective implementation depends upon thenumber and kind of GPUs, and thenetwork interconnectin the cluster.

# ring-based collectives

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( tf.distribute.experimental.CollectiveCommunication.RING)

(145)

Multi Servers Trainings - MultiWorkerMirroredStrategy (2/2)

I Two different implementations:

CollectiveCommunication.RING(ring-based implementation)CollectiveCommunication.NCCL(Nvidia’s NCCL implementation)

I CollectiveCommunication.AUTOdefers the choice to the runtime.

I Thebest choice of collective implementation depends upon thenumber and kind of GPUs, and thenetwork interconnectin the cluster.

# ring-based collectives

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( tf.distribute.experimental.CollectiveCommunication.RING)

(146)

Multi Servers Trainings - ParameterServerStrategy

I Supports parameter servers training onmultiple machines.

I Some machines are designated as workersand some asparameter servers.

I Each parameter of the model is placed on oneparameter server.

I Computationis replicatedacross all GPUs of all the workers.

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

(147)

Multi Servers Trainings - ParameterServerStrategy

I Supports parameter servers training onmultiple machines.

I Some machines are designated as workersand some asparameter servers.

I Each parameter of the model is placed on oneparameter server.

I Computationis replicatedacross all GPUs of all the workers.

(148)

Multi Servers Trainings - ParameterServerStrategy

I Supports parameter servers training onmultiple machines.

I Some machines are designated as workersand some asparameter servers.

I Each parameter of the model is placed on oneparameter server.

I Computationis replicatedacross all GPUs of all the workers.

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

(149)

Multi Servers Trainings - ParameterServerStrategy

I Supports parameter servers training onmultiple machines.

I Some machines are designated as workersand some asparameter servers.

I Each parameter of the model is placed on oneparameter server.

I Computationis replicatedacross all GPUs of all the workers.

(150)

Multi Servers Trainings - More Details

I A TensorFlowclusteris a group of TensorFlow processesrunning in parallel.

I Each TF process (a.k.atask) in the cluster has a type:

Worker: performscomputations, usually on a machine with one or moreGPUs.Parameter Server (ps): keeps track of parameters values, it is usually on a CPU-only

machine.

I The set of tasks that share the same type is often called a job. For example, the

worker job is theset of all workers.

(151)

Multi Servers Trainings - More Details

I A TensorFlowclusteris a group of TensorFlow processesrunning in parallel.

I Each TF process (a.k.atask) in the cluster has a type:

Worker: performscomputations, usually on a machine with one or moreGPUs.Parameter Server (ps): keeps track of parameters values, it is usually on a CPU-only

machine.

I The set of tasks that share the same type is often called a job. For example, the

(152)

Multi Servers Trainings - More Details

I A TensorFlowclusteris a group of TensorFlow processesrunning in parallel.

I Each TF process (a.k.atask) in the cluster has a type:

Worker: performscomputations, usually on a machine with one or moreGPUs.

Parameter Server (ps): keeps track of parameters values, it is usually on a CPU-only

machine.

I The set of tasks that share the same type is often called a job. For example, the

worker job is theset of all workers.

(153)

Multi Servers Trainings - More Details

I A TensorFlowclusteris a group of TensorFlow processesrunning in parallel.

I Each TF process (a.k.atask) in the cluster has a type:

Worker: performscomputations, usually on a machine with one or moreGPUs.Parameter Server (ps): keeps track of parameters values, it is usually on a CPU-only

machine.

I The set of tasks that share the same type is often called a job. For example, the

(154)

Multi Servers Trainings - More Details

I A TensorFlowclusteris a group of TensorFlow processesrunning in parallel.

I Each TF process (a.k.atask) in the cluster has a type:

Worker: performscomputations, usually on a machine with one or moreGPUs.Parameter Server (ps): keeps track of parameters values, it is usually on a CPU-only

machine.

I The set of tasks that share the same type is often called a job. For example, the

worker job is theset of all workers.

(155)

Multi Servers Trainings - Example (1/3)

I Assume a clusterwith 3 tasks (2 workers and1 parameter server).

cluster_spec = tf.train.ClusterSpec({

"worker": [ "machine-a.example.com:2222", # /job:worker/task:0 "machine-b.example.com:2222" # /job:worker/task:1 ], "ps": ["machine-a.example.com:2221"] # /job:ps/task:0 })

(156)

Multi Servers Trainings - Example (2/3)

I Tostart a task, you must give it thecluster specand define itstype and index (ID), e.g., worker 0.

ps0 = tf.distribute.Server(cluster_spec, job_name="ps", task_index=0)

worker0 = tf.distribute.Server(cluster_spec, job_name="worker", task_index=0) worker1 = tf.distribute.Server(cluster_spec, job_name="worker", task_index=1)

(157)

Multi Servers Trainings - Example (2/3)

I Tostart a task, you must give it thecluster specand define itstype and index (ID), e.g., worker 0.

ps0 = tf.distribute.Server(cluster_spec, job_name="ps", task_index=0)

worker0 = tf.distribute.Server(cluster_spec, job_name="worker", task_index=0)

(158)

Multi Servers Trainings - Example (2/3)

I Tostart a task, you must give it thecluster specand define itstype and index (ID), e.g., worker 0.

ps0 = tf.distribute.Server(cluster_spec, job_name="ps", task_index=0)

worker0 = tf.distribute.Server(cluster_spec, job_name="worker", task_index=0) worker1 = tf.distribute.Server(cluster_spec, job_name="worker", task_index=1)

(159)

Multi Servers Trainings - Example (3/3)

I Alternative wayto specify acluster specis to use theTF CONFIGenvironment variable

before starting the program.

I For example to run worker 1:

distribution = tf.distribute.experimental.ParameterServerStrategy()

os.environ["TF_CONFIG"] = json.dumps({

"cluster": {

"worker": ["machine-a.example.com:2222", "machine-b.example.com:2222"],

"ps": ["machine-a.example.com:2221"]},

"task": {"type": "worker", "index": 1} })

with distribution.scope():

model = keras.models.Sequential([...]) model.compile(...)

(160)

Communication Overhead

(161)

Communication Overhead in Data Parallelization

I Synchronizing the model replicas in data-parallel training requires communication between workers(inallreduce)

I Between workers and parameter servers (in thecentralized architecture).

(162)

Communication Overhead in Data Parallelization

I Synchronizing the model replicas in data-parallel training requires communication between workers(inallreduce)

I Between workers and parameter servers(in the centralized architecture).

I Such communication can easily become thebottleneckof the overall training process.

(163)

Communication Overhead in Data Parallelization

I Synchronizing the model replicas in data-parallel training requires communication between workers(inallreduce)

I Between workers and parameter servers(in the centralized architecture).

(164)

Approaches for Communication Efficiency

I Reducing the model precision

I Compressing the model updates

I Improving the communication scheduling

(165)

Reducing the Model Precision

I Reduce the precision of the parameters’ data types, e.g., from double precision to single floating point.

I It saves communication bandwidth when parameter updates need to be transferred over the network.

I Itreduces the model size, which can be useful when the model is deployed on resource-constrained hardware such as GPUs.

(166)

Reducing the Model Precision

I Reduce the precision of the parameters’ data types, e.g., from double precision to single floating point.

I It saves communication bandwidth when parameter updates need to be transferred over the network.

I Itreduces the model size, which can be useful when the model is deployed on resource-constrained hardware such as GPUs.

(167)

Reducing the Model Precision

I Reduce the precision of the parameters’ data types, e.g., from double precision to single floating point.

I It saves communication bandwidth when parameter updates need to be transferred over the network.

I Itreduces the model size, which can be useful when the model is deployed on resource-constrained hardware such as GPUs.

(168)

Compressing the Model Updates

I The model updates communicated between workers and between workers and pa-rameter serverscan be compressed.

I Gradient quantization: reducing the number of bits per gradient.

I Gradient sparsification: communicating onlyimportant gradientsthat have a signifi-cant value.

(169)

Compressing the Model Updates

I The model updates communicated between workers and between workers and pa-rameter serverscan be compressed.

I Gradient quantization: reducing the number of bits per gradient.

I Gradient sparsification: communicating onlyimportant gradientsthat have a signifi-cant value.

(170)

Compressing the Model Updates

I The model updates communicated between workers and between workers and pa-rameter serverscan be compressed.

I Gradient quantization: reducing the number of bits per gradient.

I Gradient sparsification: communicating onlyimportant gradientsthat have a signifi-cant value.

(171)

Improving the Communication Scheduling

I Communication patterns in data-parallel are typically bursty, especially in syn-chronoussystems.

All workersmayshare their updated parametersat thesame timewith their peer

workers or parameter servers.

I To prevent that the network bandwidth is exceeded and communication is delayed, the communication of the different workers can be scheduled such that it does not overlap.

(172)

Improving the Communication Scheduling

I Communication patterns in data-p

References

Related documents