• No results found

Producer/consumer queues (TThreadedQueue)

An alternative to using an ordinary TQueue in combination with TMonitor or a critical section object is TThreadedQueue. Declared in System.Generics.Collections, this implements a ‘producer/consumer’ queue. Aside from protecting additions and removals with a lock, it also sets a fixed cap on the number of items it can hold, making a thread trying to add an item wait if the capacity has been reached, and conversely, making a thread trying to remove an item wait too if there is nothing to remove. In a normal TQueue, in contrast, the ‘capacity’ determines only the number of slots currently allocated, not the actual maximum number of items; furthermore, attempting to dequeue an item from an empty TQueue is

considered an error, not a request that should be eventually fulfilled.

In use, you specify the maximum number of slots, together with millisecond timeouts for both pushing and popping items, when creating the threaded queue. By default, 10 slots are allocated and neither pushing nor popping may timeout:

constructor Create(AQueueDepth: Integer = 10;

PushTimeout: LongWord = INFINITE;

PopTimeout: LongWord = INFINITE);

If either pushing or popping should never cause the calling thread to block, pass 0 for the timeout:

var

StringQueue: TThreadedQueue<string>;

begin

StringQueue := TThreadedQueue<string>.Create(

20, 0, 0); //20 slots, no blocking

Where TQueue has Enqueue and Dequeue methods, TThreadedQueue has PushItem and PopItem. Given the possibility of timeouts, their signatures are also a bit different, as well as being overloaded:

function PopItem: T; overload;

function PopItem(var AQueueSize: Integer): T; overload;

function PopItem(var AQueueSize: Integer;

var AItem: T): TWaitResult; overload;

function PopItem(var AItem: T): TWaitResult; overload;

function PushItem(const AItem: T): TWaitResult; overload;

function PushItem(const AItem: T;

var AQueueSize: Integer): TWaitResult; overload;

Usually the ones to use are the third variant of PopItem and the first variant of PushItem, since the current queue size shouldn’t really bother the calling thread — just knowing whether the request was successful should be enough.

With respect to the result type, TWaitResult is an enumeration defined in System.SyncUtils with the possible values

wrSignaled, wrTimeout, wrAbandoned, wrError and wrIOCompletion. However, only wrTimeout (meaning failure) and wrSignaled

(meaning success) will be returned by PushItem or PopItem.

The remaining public members of TThreadedQueue are a method, DoShutdown, together with the read-only properties

QueueSize, ShutDown (which reports whether DoShutdown has been called), TotalItemsPopped and TotalItemsPushed. Calling

DoShutdown when one or more threads are currently blocking on either PushItem or PopItem will cause them to be released;

usually you do this from the main thread when the application terminates, to give blocking threads a chance to terminate gracefully rather than just be abruptly terminated by the operating system:

procedure TWorkerThread.Execute;

var

Work: string;

begin

while WorkQueue.PopItem(Work) = wrSignaled do begin

//process the work item...

end;

//clean up...

end //...

procedure TMainForm.FormDestroy(Sender: TObject);

begin

WorkQueue.DoShutdown;

WaitForWorkerThreads;

WorkQueue.Free;

end;

TThread.Terminate, TThread.WaitFor

Any secondary thread you create in Delphi will be a ‘background’ thread. This means no secondary thread will keep the application running if the main thread has finished.

Because of that, the following program won’t output anything:

program AbruptFinish;

{$APPTYPE CONSOLE}

uses System.Classes;

begin

TThread.CreateAnonymousThread(

procedure begin

Sleep(1000);

WriteLn('This will never execute!');

end).Start;

end.

This behaviour is not intrinsically bad, since the operating system will reclaim any memory and resources used by the secondary threads it halts. Nonetheless, the particular requirements of the application may make it undesirable even so.

For example, a thread might have data that should be flushed to disk, or a database transaction that needs completing.

If you need clean up code to run, putting it inside the thread object’s destructor or even the finally part of a try/finally

statement won’t help, since the operating system will just ignore them — once the main thread has finished, secondary threads are halted there and then. Instead, the main thread must ask secondary threads to finish, then explicitly wait on them to respond or complete.

This pattern is implemented by the TThread class in the form of two methods, Terminate and WaitFor, and a property,

Terminated: when the main thread wants a worker to abort, it calls the thread object’s Terminate method. This sets the worker’s Terminated property to True. In order for the thread to actually terminate, the Execute implementation must then

periodically check the Terminated property, exiting once this returns True (in the case of an anonymous thread, the thread procedure must periodically check TThread.CurrentThread.CheckTerminated). When the thread is structured around either a

while or repeat/until loop, this checking can be done easily enough:

//explicit TThread descendant + while loop procedure TMyThread.Execute;

begin

while not Terminated do begin

//do stuff end;

end;

//anonymous TThread + repeat/until loop Thread := TThread.CreateAnonymousThread(

procedure begin repeat

{ do stuff }

until TThread.CurrentThread.CheckTerminated;

end);

Otherwise, the thread object’s Execute method or the anonymous thread’s anonymous procedure should make periodic checks:

procedure TMyThread.Execute;

begin

//... do stuff

if Terminated then Exit;

//... do more stuff if Terminated then Exit;

//... do yet more stuff end;

Since Terminate just sets a flag rather than terminating the thread there and then, there needs to be a way to wait on a thread to finish. This is provided for by the WaitFor method:

var

Worker: TThread;

begin //...

Worker.Terminate;

Worker.WaitFor;

There are a few problems with WaitFor however. The first is that you cannot specify a timeout, which means if the waited on thread hangs (gets in an endless loop, has deadlocked, etc.), then the calling thread will hang too. Secondly, it requires the thread’s FreeOnTerminate property to be False — in set it to True (or in the case of an anonymous thread, leave it set to

True), and you will get an odd-looking exception. Lastly, it does not work in a console program when the caller is the main thread. This is because WaitFor performs some special handling for the Synchronize method, which itself is incompatible with console programs.

To an extent, these problems can be overcome by calling the operating system API directly. In the case of Windows, this means calling WaitForSingleObject, a function declared in Winapi.Windows. Unlike TThread.WaitFor, this function always allows you to specify a millisecond timeout (to mimic the WaitFor behaviour, pass the special constant INFINITE):

uses Winapi.Windows, System.SysUtils, System.Classes;

var

Worker: TThread;

begin //...

Worker.Terminate;

WaitForSingleObject(Worker.Handle, INFINITE);

If you have more than one secondary thread to wait on, you can wait on them all at once by calling

WaitForMultipleObjects:

procedure TerminateAndWaitForThreads(

const Threads: array of TThread; Timeout: LongWord = INFINITE);

var

Handles: array of THandle;

I: Integer;

begin

SetLength(Handles, Length(Threads));

for I := 0 to High(Handles) do

begin

Handles[I] := Threads[I].Handle;

Threads[I].Terminate;

end;

WaitForMultipleObjects(Length(Handles), @Handles[0], True, Timeout);

end;

On OS X, the relevant API function is pthread_join. This is passed the thread ID rather than the thread handle (thread handles in fact don’t exist on POSIX), and does not accept a timeout. There is also no equivalent to WaitForMultipleObjects

— instead, you need to call pthread_join in a loop:

uses Posix.Pthread, System.SysUtils, System.Classes;

var

I: Integer;

Thread: TThread;

Workers: array of TThread;

ExitCode: Integer;

begin //...

for Thread in Workers do begin

Thread.Terminate;

pthread_join(Thread.ThreadID, @ExitCode);

end;

Even when calling the native API directly however, the problem of FreeOnTerminate when that property is set to True still largely remains, since the thread object may have been freed by the time the Handle or ThreadID property is read. In principle, the only safe option is to create the thread objects suspended, read off their handles or IDs into a variable, get them going, then when the time times, call WaitForXXXObject or pthread_join with the saved handles or IDs:

uses Winapi.Windows;

var

WorkerThread: TThread;

WorkerHandle: THandle;

begin

WorkerThread := TThread.CreateAnonymousThread(

procedure begin //...

end);

WorkerHandle := WorkerThread.Handle;

WorkerThread.Start;

//...

WaitForSingleObject(WorkerHandle, 5000);

end.

Alternatively, you can avoid looking for a TThread.WaitFor substitute in the first place and use a separate signalling object instead. We’ll look at perhaps the most apposite candidate next: TCountdownEvent.