• No results found

4.9 Web Operations

5.1.5 The Queue Module for Threads and Multiprocessing

Threaded applications often have some sort of work queue data structure. When a thread becomes free, it will pick up work to do from the queue. When a thread creates a task, it will add that task to the queue. Clearly one needs to guard the queue with locks. But Python provides theQueuemodule to take care of all the lock creation, locking and unlocking, and so on. This means we don’t have to bother with it, and the code will probably be faster.

Queueis implemented for boththreadingandmultiprocessing, in almost identical forms. This is good, be- cause the documentation formultiprocessingis rather sketchy, so you can turn to the docs forthreading for more details.

The functionput()in Queue adds an element to the end of the queue, whileget()will remove the head of the queue, again without the programmer having to worry about race conditions.

Note that get()will block if the queue is currently empty. An alternative is to call it with block=False, within atry/exceptconstruct. One can also set timeout periods.

Here once again is the prime number example, this time done withQueue:

1 #!/usr/bin/env python 2

3 # prime number counter, based on Python multiprocessing class with 4 # Queue

5.1. THE PYTHON THREADS AND MULTIPROCESSING MODULES 111

6 # usage: python PrimeThreading.py n nthreads

7 # where we wish the count of the number of primes from 2 to n, and to 8 # use nthreads to do the work

9

10 # uses Sieve of Erathosthenes: write out all numbers from 2 to n, then 11 # cross out all the multiples of 2, then of 3, then of 5, etc., up to 12 # sqrt(n); what’s left at the end are the primes

13

14 import sys 15 import math

16 from multiprocessing import Process, Array, Queue 17

18 class glbls: # globals, other than shared 19 n = int(sys.argv[1])

20 nthreads = int(sys.argv[2])

21 thrdlist = [] # list of all instances of this class 22

23 def prmfinder(id,prm,nxtk):

24 nk = 0 # count of k’s done by this thread, to assess load balance 25 while 1:

26 # find next value to cross out with 27 try: k = nxtk.get(False)

28 except: break

29 nk += 1 # increment workload data 30 if prm[k]: # now cross out 31 r = glbls.n / k

32 for i in range(2,r+1): 33 prm[i*k] = 0

34 print ’thread’, id, ’exiting; processed’, nk, ’values of k’ 35

36 def main():

37 prime = Array(’i’,(glbls.n+1) * [1]) # 1 means prime, until find otherwise 38 nextk = Queue() # next value to try crossing out with

39 lim = int(math.sqrt(glbls.n)) + 1 # fill the queue with 2...sqrt(n) 40 for i in range(2,lim): nextk.put(i)

41 for i in range(glbls.nthreads):

42 pf = Process(target=prmfinder, args=(i,prime,nextk)) 43 glbls.thrdlist.append(pf)

44 pf.start()

45 for thrd in glbls.thrdlist: thrd.join()

46 print ’there are’, reduce(lambda x,y: x+y, prime) - 2, ’primes’ 47

48 if __name__ == ’__main__’: 49 main()

The wayQueueis used here is to put all the possible “crosser-outers,” obtained in the variablenextkin the previous versions of this code, into a queue at the outset. One then usesget()to pick up work from the queue. Look Ma, no locks!

Below is an example of queues in an in-place quicksort. (Again, the reader is warned that this is just an example, not claimed to be efficient.)

The work items in the queue are a bit more involved here. They have the form (i,j,k), with the first two elements of this tuple meaning that the given array chunk corresponds to indicesithroughjofx, the original

array to be sorted. In other words, whichever thread picks up this chunk of work will have the responsibility of handling that particular section ofx.

Quicksort, of course, works by repeatedly splitting the original array into smaller and more numerous chunks. Here a thread will split its chunk, taking the lower half for itself to sort, but placing the upper half into the queue, to be available for other chunks that have not been assigned any work yet. I’ve written the algorithm so that as soon as all threads have gotten some work to do, no more splitting will occur. That’s where the value ofkcomes in. It tells us the split number of this chunk. If it’s equal tonthreads-1, this thread won’t split the chunk.

1 # Quicksort and test code, based on Python multiprocessing class and 2 # Queue

3

4 # code is incomplete, as some special cases such as empty subarrays 5 # need to be accounted for

6

7 # usage: python QSort.py n nthreads

8 # where we wish to test the sort on a random list of n items, 9 # using nthreads to do the work

10

11 import sys 12 import random

13 from multiprocessing import Process, Array, Queue 14

15 class glbls: # globals, other than shared 16 nthreads = int(sys.argv[2])

17 thrdlist = [] # list of all instances of this class 18 r = random.Random(9876543) 19 20 def sortworker(id,x,q): 21 chunkinfo = q.get() 22 i = chunkinfo[0] 23 j = chunkinfo[1] 24 k = chunkinfo[2]

25 if k < glbls.nthreads - 1: # need more splitting? 26 splitpt = separate(x,i,j)

27 q.put((splitpt+1,j,k+1)) 28 # now, what do I sort? 29 rightend = splitpt + 1 30 else: rightend = j

31 tmp = x[i:(rightend+1)] # need copy, as Array type has no sort() method 32 tmp.sort()

33 x[i:(rightend+1)] = tmp 34

35 def separate(xc, low, high): # common algorithm; see Wikipedia

36 pivot = xc[low] # would be better to take, e.g., median of 1st 3 elts 37 (xc[low],xc[high]) = (xc[high],xc[low]) 38 last = low 39 for i in range(low,high): 40 if xc[i] <= pivot: 41 (xc[last],xc[i]) = (xc[i],xc[last]) 42 last += 1 43 (xc[last],xc[high]) = (xc[high],xc[last]) 44 return last

5.2. USING PYTHON WITH MPI 113

45

46 def main(): 47 tmp = []

48 n = int(sys.argv[1])

49 for i in range(n): tmp.append(glbls.r.uniform(0,1)) 50 x = Array(’d’,tmp)

51 # work items have form (i,j,k), meaning that the given array chunk 52 # corresponds to indices i through j of x, and that this is the kth 53 # chunk that has been created, x being the 0th

54 q = Queue() # work queue 55 q.put((0,n-1,0))

56 for i in range(glbls.nthreads):

57 p = Process(target=sortworker, args=(i,x,q)) 58 glbls.thrdlist.append(p)

59 p.start()

60 for thrd in glbls.thrdlist: thrd.join() 61 if n < 25: print x[:]

62

63 if __name__ == ’__main__’: 64 main()