• No results found

Implementing Publish/Subscribe Layer for Zero-Latency Dequeue

In document Windows Azure Prescriptive Guidance (Page 123-130)

if (currentDepth < 100) return 10;

if (currentDepth >= 100 && currentDepth < 1000) return 50;

if (currentDepth >= 1000) return 100;

// Return the minimum acceptable count.

return 1;

}

It is worth reiterating that the example code above is not intended to be a “one size fits all”

approach. A more ideal solution would be to invoke an externally configurable and manageable rule which performs the necessary computations.

At this point, we have a working prototype of a queue listener capable of automatically scaling itself up and down as per fluctuating workload. Perhaps, as a final touch, it needs to be enriched with the ability to adapt itself to variable load while it’s being processed. This capability can be added by applying the same pattern as it was being followed when adding support for the QueueWorkDetected event.

Now, let’s switch focus to another important optimization that will help reduce latency in the queue listeners.

Implementing Publish/Subscribe Layer for Zero-Latency Dequeue

In this section, we are going to enhance the above implementation of a queue listener with a push-based notification mechanism built on top of the Service Bus one-way multicast capability.

The notification mechanism is responsible for triggering an event telling the queue listener to start performing dequeue work. This approach helps avoid polling the queue to check for new messages and therefore eliminate the associated latency.

First, we define a trigger event that will be received by our queue listener in case a new workload is deposited into a queue:

/// Implements a trigger event indicating that a new workload was put in a queue.

[DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)]

public class CloudQueueWorkDetectedTriggerEvent {

/// Returns the name of the storage account on which the queue is located.

[DataMember]

public string StorageAccount { get; private set; }

/// Returns a name of the queue where the payload was put.

[DataMember]

public string QueueName { get; private set; }

/// Returns a size of the queue's payload (e.g. the size of a message or the number of messages in a batch).

[DataMember]

public long PayloadSize { get; private set; }

// ... The constructor was omitted for brevity ...

}

Next, we enable the queue listener implementations to act as subscribers to receive a trigger event. The first step is to define a queue listener as an observer for the

CloudQueueWorkDetectedTriggerEvent event:

/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.

public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent>

{

// ... The body is omitted as it was supplied in previous examples ...

}

The second step is to implement the OnNext method defined in the IObserver<T> interface. This method gets called by the provider to notify the observer about a new event:

public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T>

{

// ... There is some code before this point ...

/// <summary>

/// Gets called by the provider to notify this queue listener about a new trigger event.

/// </summary>

/// <param name="e">The trigger event indicating that a new payload was put in a queue.</param>

public void OnNext(CloudQueueWorkDetectedTriggerEvent e) {

Guard.ArgumentNotNull(e, "e");

// Make sure the trigger event is for the queue managed by this listener, otherwise ignore.

if (this.queueLocation.StorageAccount == e.StorageAccount &&

this.queueLocation.QueueName == e.QueueName) {

if (QueueWorkDetected != null) {

QueueWorkDetected(this);

} } }

// ... There is more code after this point ...

}

At it can be seen in the above example, we purposefully invoke the same event delegate as it is used in the previous steps. The QueueWorkDetected event handler already provides the necessary application logic for instantiating optimal number of dequeue tasks. Therefore, the same event handler is reused when handling the CloudQueueWorkDetectedTriggerEvent notification.

As noted in the previous sections, we don’t have to maintain a continuously running dequeue task when a push-based notification is employed. Therefore, we can reduce the number of queue tasks per a queue listener instance to zero and use a notification mechanism to instantiate dequeue tasks when the queue receives work items. In order to make sure that we are not running any idle dequeue tasks, the following straightforward modification in the QueueEmpty event hander is required:

private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay) {

// ... There is some code before this point ...

// As soon as interval reaches its maximum, tell the source dequeue task that it must gracefully terminate itself.

return delay.TotalMilliseconds >= maximumIdleIntervalMs;

In summary, we are no longer detecting whether or not there is a single active dequeue task remaining. The result of the revised QueueEmpty event hander only takes into account the fact of exceeding the maximum idle interval upon which all active dequeue tasks will be shut down.

To receive the CloudQueueWorkDetectedTriggerEvent notifications, we leverage the

publish/subscribe model that is implemented as loosely coupled messaging between Windows Azure role instances. In essence, we hook on the same inter-role communication layer and handle the incoming events as follows:

public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension {

// ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

public void OnNext(InterRoleCommunicationEvent e) {

if (this.owner != null && e.Payload != null) {

// ... There is some code before this point ...

if (e.Payload is CloudQueueWorkDetectedTriggerEvent) {

HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent);

return;

}

// ... There is more code after this point ...

} }

private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e)

{

Guard.ArgumentNotNull(e, "e");

// Enumerate through registered queue listeners and relay the trigger event to them.

foreach (var queueService in

this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>()) {

// Pass the trigger event to a given queue listener.

queueService.OnNext(e);

} }

Multicasting a trigger event defined in the CloudQueueWorkDetectedTriggerEvent class is the ultimate responsibility of a publisher, namely, the component depositing work items on a queue.

This event can be triggered either before the very first work item is enqueued or after last item is put in a queue. In the example below, we publish a trigger event upon completing putting work items into the input queue:

public class ProcessInitiatorWorkerRole : RoleEntryPoint {

// The instance of the role extension which provides an interface to the inter-role communication service.

private volatile IInterRoleCommunicationExtension interRoleCommunicator;

// ... Some code here was omitted for brevity. See the corresponding guidance on Windows Azure CAT team blog for reference ...

private void HandleWorkload() {

// Step 1: Receive compute-intensive workload.

// ... (code was omitted for brevity) ...

// Step 2: Enqueue work items into the input queue.

// ... (code was omitted for brevity) ...

// Step 3: Notify the respective queue listeners that they should expect work to arrive.

// Create a trigger event referencing the queue into which we have just put work items.

var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount",

"InputQueue");

// Package the trigger into an inter-role communication event.

var interRoleEvent = new

InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger);

// Publish inter-role communication event via the Service Bus one-way multicast.

interRoleCommunicator.Publish(interRoleEvent);

} }

Now that we have built a queue listener that is capable of supporting multi-threading, auto-scaling and push-based notifications, it’s time to consolidate all recommendations pertaining to the

Conclusion

To maximize the efficiency and cost effectiveness of queue-based messaging solutions running on the Windows Azure platform, solution architects and developers should consider the following recommendations.

As a solution architect, you should:

 Provision a queue-based messaging architecture that uses the Windows Azure queue storage service for high-scale asynchronous communication between tiers and services in cloud-based or hybrid solutions.

 Recommend partitioned queuing architecture to scale beyond 500 transactions/sec.

 Understand the fundamentals of Windows Azure pricing model and optimize solution to lower transaction costs through a series of best practices and design patterns.

 Consider dynamic scaling requirements by provisioning an architecture that is adaptive to volatile and fluctuating workloads.

 Employ the right auto-scaling techniques and approaches to elastically expand and shrink compute power to further optimize the operating expense.

 Evaluate the cost-benefit ratio of reducing latency through taking dependency on Windows Azure Service Bus for real-time push-based notification dispatch.

As a developer, you should:

 Design a messaging solution that employs batching when storing and retrieving data from Windows Azure queues.

 Implement an efficient queue listener service ensuring that queues will be polled by a maximum of one dequeue thread when empty.

 Dynamically scale down the number of worker role instances when queues remain empty for a prolonged period of time.

 Implement an application-specific random exponential back-off algorithm to reduce the effect of idle queue polling on storage transaction costs.

 Adopt the right techniques that prevent from exceeding the scalability targets for a single queue when implementing highly multi-threaded multi-instance queue publishers and

consumers.

 Employ a robust retry policy capable of handling a variety of transient conditions when publishing and consuming data from Windows Azure queues.

 Use the one-way eventing capability provided by Windows Azure Service Bus to support push-based notifications in order to reduce latency and improve performance of the queue-based messaging solution.

 Explore the new capabilities of the .NET Framework 4 such as TPL, PLINQ and Observer pattern to maximize the degree of parallelism, improve concurrency and simplify the design of multi-threaded services.

The accompanying sample code is available for download from the MSDN Code Gallery. The sample code also includes all the required infrastructure components such as generics-aware abstraction layer for the Windows Azure queue service which were not supplied in the above code snippets. Note that all source code files are governed by the Microsoft Public License as explained in the corresponding legal notices.

Additional Resources/References

For more information on the topic discussed in this whitepaper, please refer to the following:

 Understanding Windows Azure Storage Billing – Bandwidth, Transactions, and Capacity post on the Windows Azure Storage team blog.

 Service Management API article in the MSDN Library.

 About the Service Management API in Windows Azurepost on Neil Mackenzie’s blog.

 Windows Azure Service Management CmdLets project on the MSDN CodePlex.

 Windows Azure Storage Abstractions and their Scalability Targets post on the Windows Azure Storage team blog.

 Queue Read/Write Throughput study published by eXtreme Computing Group at Microsoft Research.

 The Transient Fault Handling Framework for Azure Storage, Service Bus & SQL Azure project on the MSDN Code Gallery.

 The Autoscaling Application Block in the MSDN library.

 Windows Azure Storage Transaction - Unveiling the Unforeseen Cost and Tips to Cost Effective Usage post on Wely Lau’s blog.

How to Integrate a BizTalk Server Application with

In document Windows Azure Prescriptive Guidance (Page 123-130)