• No results found

Custom proxy streams: buffering reads and writes

A slightly more complicated example of a proxy stream is one that adds generic buffering support. Here, the proxy will batch either read or write calls to the underlying stream, the point being to speed things up when only a few bytes are sent or requested at a time.

Similar to the TZCompressionStream/TZDecompressionStream case, it isn’t really practical to define a single class to perform buffered reads and buffered writes, so a pair of classes, one for buffered reading and one for buffered writing, are in

TBufferedStream = class abstract(TStream) strict private

FBaseStream: TStream;

FOwnsStream: Boolean;

strict protected

FBuffer: array of Byte;

FBufferPos: Integer;

public

constructor Create(AStream: TStream;

AOwnsStream: Boolean = False;

ABufferSize: Integer = $10000); //64KB destructor Destroy; override;

property BaseStream: TStream read FBaseStream;

property OwnsStream: Boolean read FOwnsStream;

end;

constructor TBufferedStream.Create(AStream: TStream;

AOwnsStream: Boolean; ABufferSize: Integer);

begin

inherited Create;

FBaseStream := AStream;

FOwnsStream := AOwnsStream;

SetLength(FBuffer, ABufferSize);

end;

destructor TBufferedStream.Destroy;

begin

if FOwnsStream then FBaseStream.Free;

inherited Destroy;

end;

This sets up the base stream and initialises the buffer, which is defined as a dynamic array of bytes. A default buffer size of 64KB is used, however this can be customised as required.

The declaration of the buffered input stream class can be not much more than the necessary overrides:

type

TBufferedInputStream = class(TBufferedStream) strict private

FBufferSize: Integer;

procedure EmptyBuffer;

protected

function GetSize: Int64; override;

procedure SetSize(const NewSize: Int64); override;

public

destructor Destroy; override;

function Read(var Buffer; Count: Integer): Integer; override;

function Seek(const Offset: Int64;

Origin: TSeekOrigin): Int64; override;

function Write(const Buffer; Count: Integer): Integer; override;

end;

In practical terms, buffering reads means that a minimum number of bytes will be requested from the base stream by the proxy, regardless of how much data the caller asks for; if that request is for less than the size of the buffer (or more exactly, for anything other than a multiple of the size of the buffer without remainder), then the remaining bytes are kept in memory by the proxy, to be passed on next time Read is called:

function TBufferedInputStream.Read(var Buffer;

Count: Integer): Integer;

var

BytesToReadOffBuffer: Integer;

SeekPtr: PByte;

begin

Result := 0;

SeekPtr := @Buffer;

while Count > 0 do begin

if FBufferPos = FBufferSize then begin

FBufferPos := 0;

FBufferSize := BaseStream.Read(FBuffer[0], Length(FBuffer));

if FBufferSize = 0 then Exit;

end;

BytesToReadOffBuffer := FBufferSize - FBufferPos;

if BytesToReadOffBuffer > Count then BytesToReadOffBuffer := Count;

Move(FBuffer[FBufferPos], SeekPtr^, BytesToReadOffBuffer);

Inc(FBufferPos, BytesToReadOffBuffer);

Inc(SeekPtr, BytesToReadOffBuffer);

Dec(Count, BytesToReadOffBuffer);

Inc(Result, BytesToReadOffBuffer);

end;

end;

As appropriate, Seek calls must either be adjusted to take account of the buffer or cause the buffer itself to be reset:

function TBufferedInputStream.Seek(const Offset: Int64;

Origin: TSeekOrigin): Int64;

begin

if Origin = soCurrent then

if (Offset >= 0) and (Offset <= (FBufferSize - FBufferPos)) then begin

Result := BaseStream.Seek(Offset, Origin);

FBufferPos := 0;

if not OwnsStream then EmptyBuffer;

inherited Destroy;

end;

procedure TBufferedInputStream.EmptyBuffer;

begin

if (FBufferPos = 0) and (FBufferSize = 0) then Exit;

BaseStream.Seek(FBufferPos - FBufferSize, soCurrent);

FBufferPos := 0;

FBufferSize := 0;

end;

The Size property getter can simply delegate to the base stream:

function TBufferedInputStream.GetSize: Int64;

begin

Result := BaseStream.Size;

end;

Finally, SetSize and Write should either try and be clever and take account of the buffer, or just empty it and delegate to the base stream. We will do the latter:

procedure TBufferedInputStream.SetSize(const NewSize: Int64);

begin

EmptyBuffer;

BaseStream.Size := NewSize;

end;

function TBufferedInputStream.Write(const Buffer;

Count: Integer): Integer;

begin

EmptyBuffer;

Result := BaseStream.Write(Buffer, Count);

end;

A buffered output class can follow a similar pattern:

type

TBufferedOutputStream = class(TBufferedStream) protected

function DoFlush: Boolean;

function GetSize: Int64; override;

procedure SetSize(const NewSize: Int64); override;

public

destructor Destroy; override;

procedure Flush;

function Read(var Buffer; Count: Integer): Integer; override;

function Seek(const Offset: Int64;

Origin: TSeekOrigin): Int64; override;

function Write(const Buffer; Count: Integer): Integer; override;

end;

inherited Destroy;

end;

function TBufferedOutputStream.GetSize: Int64;

begin Flush;

Result := BaseStream.Size;

end;

procedure TBufferedOutputStream.SetSize(const NewSize: Int64);

begin Flush;

BaseStream.Size := NewSize;

end;

function TBufferedOutputStream.Read(var Buffer;

Count: Integer): Integer;

begin Flush;

Result := BaseStream.Read(Buffer, Count);

end;

In the case of Seek, we can avoid flushing if the method is called only to find the current position. In that case, we can return the current position of the base stream offset by the amount of data currently held in the buffer:

function TBufferedOutputStream.Seek(const Offset: Int64;

Origin: TSeekOrigin): Int64;

begin

if (Offset = 0) and (Origin = soCurrent) then

Result := BaseStream.Seek(0, soCurrent) + FBufferPos else

begin Flush;

Result := BaseStream.Seek(Offset, Origin);

end;

end;

Usually, operations that fail should raise an exception, however that is not always so in our case when it comes to flushing the buffer. One example is flushing in the destructor, since you should never raise an exception in a destructor.

Another is in the Write implementation, since the difference between Write and WriteBuffer lies precisely in how Write

does not raise an exception simply because not all the bytes requested could be written out.

The easiest way to implement this is to have a protected DoFlush method that returns False on failure, and a public Flush

method that calls DoFlush and raises an exception if it returns False:

function TBufferedOutputStream.DoFlush: Boolean;

var

BytesWritten: Integer;

begin

while FBufferPos > 0 do begin

BytesWritten := BaseStream.Write(FBuffer[0], FBufferPos);

if BytesWritten = 0 then Exit(False);

//move down the bytes still to be written Move(FBuffer[BytesWritten], FBuffer[0], FBufferPos - BytesWritten);

//decrement the buffer write position Dec(FBufferPos, BytesWritten);

end;

end;

procedure TBufferedOutputStream.Flush;

begin

if not DoFlush then

raise EWriteError.CreateRes(@SWriteError);

end;

DoFlush uses a looping construct to allow for the base stream only being able to write in chunks smaller than the buffer.

While such behaviour is not common, it is worth taking account of (the implementation of WriteBuffer in TStream does, for example).

Lastly, Write both flushes the buffer when it is full and fills it in the first place:

function TBufferedOutputStream.Write(const Buffer;

Count: Integer): Integer;

var

BytesToBuffer: Integer;

begin

Result := 0;

while Count > 0 do begin

if FBufferPos = Length(FBuffer) then

if not DoFlush and (FBufferPos = Length(FBuffer)) then Exit;

BytesToBuffer := Length(FBuffer) - FBufferPos;

if BytesToBuffer > Count then BytesToBuffer := Count;

Move(Buffer, FBuffer[FBufferPos], BytesToBuffer);

Inc(FBufferPos, BytesToBuffer);

Inc(Result, BytesToBuffer);

Dec(Count, BytesToBuffer);

end;

end;

In the book’s accompanying source code, two small sample projects demonstrate TBufferedOutputStream and

TBufferedOutputStream. In the first case, a dummy file 10 MB large is read in 2 byte chunks using a bare TFileStream and a file stream wrapped in a TBufferedOutputStream proxy. In the second, a bare TFileStream and a file stream wrapped in a

TBufferedOutputStream proxy fight it out to be the quickest to write out a 4 MB file in 2 byte chunks. In both cases the proxy comes out much, much quicker, though be warned — in practice, reading or writing in tiny chunks is not very common.

9. ZIP, XML, Registry and INI file support

In this chapter, we will look at Delphi’s ‘in the box’ support for some widely-used file formats — ZIP, XML and INI — together its support for manipulating the Windows Registry.

Related documents