• No results found

Auto Scaling Up Dequeue Tasks

In document Windows Azure Prescriptive Guidance (Page 120-123)

}

Auto Scaling Up Dequeue Tasks

In the previous section, we introduced the ability to reduce the number of active dequeue tasks to a single instance in order to minimize the impact of idle transactions on the storage operation costs. In this section, we are going to walk through a contrast example whereby we implement the “auto scale-up” capability to bring the processing power back when it’s needed.

First, we define an event delegate that will help track state transitions from an empty to a non-empty queue for the purposes of triggering relevant actions:

/// <summary>

/// Defines a callback delegate which will be invoked whenever new work arrived to a queue while the queue listener was idle.

/// </summary>

/// <param name="sender">The source of the event.</param>

public delegate void WorkDetectedDelegate(object sender);

We then extend the original definition of the ICloudQueueServiceWorkerRoleExtension interface to include a new event that will be raised every time a queue listener detects new work items, essentially when queue depth changes from zero to any positive value:

public interface ICloudQueueServiceWorkerRoleExtension {

// ... The other interface members were omitted for brevity. See the previous code snippets for reference ...

// Defines a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.

event WorkDetectedDelegate QueueWorkDetected;

}

Also, we determine the right place in the queue listener’s code where such an event will be raised. We are going to fire the QueueWorkDetected event from within the dequeue loop implemented in the DequeueTaskMain method which needs to be extended as follows:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>

{

// An instance of the delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.

public event WorkDetectedDelegate QueueWorkDetected;

private void DequeueTaskMain(object state) {

CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;

int idleStateCount = 0;

TimeSpan sleepInterval = DequeueInterval;

try {

// Run a dequeue task until asked to terminate or until a break condition is encountered.

while (workerState.CanRun) {

try {

var queueMessages = from msg in

workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;

int messageCount = 0;

// Check whether or not work items arrived to a queue while the listener was idle.

if (idleStateCount > 0 && queueMessages.Count() > 0) {

if (QueueWorkDetected != null) {

QueueWorkDetected(this);

} }

// ... The rest of the code was omitted for brevity. See the previous code snippets for reference ...

In the last step, we provide a handler for the QueueWorkDetected event. The implementation of this event handler will be supplied by a component which instantiates and hosts the queue listener. In our case, it’s a worker role. The code responsible for instantiation and implementation of event handler is comprised of the following:

public class WorkItemProcessorWorkerRole : RoleEntryPoint {

// Called by Windows Azure to initialize the role instance.

public override sealed bool OnStart() {

// ... There is some code before this point ...

// Instantiate a queue listener for the input queue.

var inputQueueListener = new

CloudQueueListenerExtension<XDocument>(inputQueueLocation);

// Configure the input queue listener.

inputQueueListener.QueueEmpty += HandleQueueEmptyEvent;

inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent;

inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize;

inputQueueListener.DequeueInterval =

configSettingsExtension.Settings.MinimumIdleInterval;

// ... There is more code after this point ...

}

// Implements a callback delegate to be invoked whenever a new work has arrived to a queue while the queue listener was idle.

private void HandleQueueWorkDetectedEvent(object sender) {

// The sender is an instance of the ICloudQueueServiceWorkerRoleExtension, we can safely perform type casting.

ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension;

// Get the current state of the queue listener to determine point-in-time load characteristics.

CloudQueueListenerInfo queueServiceState = queueService.QueryState();

// Determine the number of queue tasks that would be required to handle the workload in a queue given its current depth.

int dequeueTaskCount =

GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth);

// If the dequeue task count is less than computed above, start as many dequeue tasks as needed.

if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount) {

// Start the required number of dequeue tasks.

queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks);

}

} // ... There is more code after this point ...

In light of the above example, the GetOptimalDequeueTaskCount method is worth taking a deeper look at. This method is responsible for computing the number of dequeue tasks that would be considered optimal for handling the workload in a queue. When invoked, this method should determine (through any appropriate decision-making mechanisms) how much "horsepower" the queue listener needs in order to process the volume of work either awaiting on or expected to arrive to a given queue.

For instance, the developer could take a simplistic approach and embed a set of static rules directly into the GetOptimalDequeueTaskCount method. Using the known throughput and scalability characteristics of the queuing infrastructure, average processing latency, payload size and other relevant inputs, the rule set could take an optimistic view and decide on an optimal dequeue task count.

In the example below, an intentionally over-simplified technique is being used for determining the number of dequeue tasks:

/// <summary>

/// Returns the number of queue tasks that would be required to handle the workload in a queue given its current depth.

/// </summary>

/// <param name="currentDepth">The approximate number of items in the queue.</param>

/// <returns>The optimal number of dequeue tasks.</returns>

private int GetOptimalDequeueTaskCount(int currentDepth) {

if (currentDepth < 100) return 10;

if (currentDepth >= 100 && currentDepth < 1000) return 50;

if (currentDepth >= 1000) return 100;

// Return the minimum acceptable count.

return 1;

}

It is worth reiterating that the example code above is not intended to be a “one size fits all”

approach. A more ideal solution would be to invoke an externally configurable and manageable rule which performs the necessary computations.

At this point, we have a working prototype of a queue listener capable of automatically scaling itself up and down as per fluctuating workload. Perhaps, as a final touch, it needs to be enriched with the ability to adapt itself to variable load while it’s being processed. This capability can be added by applying the same pattern as it was being followed when adding support for the QueueWorkDetected event.

Now, let’s switch focus to another important optimization that will help reduce latency in the queue listeners.

In document Windows Azure Prescriptive Guidance (Page 120-123)