As things stand today, a message that is larger than 64KB (after it’s serialized and encoded) cannot be stored in a Windows Azure queue. The client-side API will return an exception if you attempt to place a message larger than 64KB in a queue. The maximum allowed message size can be determined by inspecting the MaxMessageSize property from the CloudQueueMessage class. As of the writing of this post, the message size limit returned by this property is 65536.
The maximum message size defined in CloudQueueMessage.MaxMessageSize property is not reflective of the maximum allowed payload size. Messages are subject to Base64 encoding when they are transmitted to a queue. The encoded payloads are always larger than their raw data. The Base64 encoding adds 25% overhead on average. As a result, the 64KB size limit effectively prohibits from storing any messages with payload larger than 48KB (75% of 64KB).
Even though it’s the limit for a single queue item, it can be deemed prohibitive for certain types of messages, especially those that cannot be broken down into smaller chunks. From a developer perspective, worrying about whether a given message can be accommodated on a queue doesn’t help my productivity. At the end of the day, the goal is to get my application data to flow between producers and consumers in the most efficient way, regardless of the data size. While one side calls Put (or Enqueue) and the other side invokes Get (or Dequeue) against a queue, the rest should theoretically occur automagically.
Overcoming the message size limitation in Windows Azure queues by employing a smart way of dealing with large messages is the key premises for the technical challenge elaborated in this article. This will come at the cost of some additional craftsmanship. In the modern world of commercial software development, any extra development efforts need to be wisely justified. I am going to justify the additional investments with the following design goals:
Support for very large messages through eliminating any restrictions imposed by the Queue Service API as it pertains to the message size.
Support for user-defined generic objects when publishing and consuming messages from a Windows Azure queue.
Important
Transparent overflow into a configurable message store either blob container, distributed cache or other type of repository capable of storing large messages.
Transparent compression that is intended to increase cost-efficiency by minimizing the amount of storage space consumed by large messages.
Increased reliability in the form of extensive use of the transient condition handling best practices when performing queue operations.
The foundation for supporting large messages in size-constrained queues will be the following pattern. First, I check if a given message can be accommodated on a Windows Azure queue without performing any extra work. The way to determine whether a message can be safely stored on a queue without violating size constraints will be through a formula which I wrap into a helper function as follows:
/// <summary>
/// Provides helper methods to enable cloud application code to invoke common, globally accessible functions.
/// </summary>
public static class CloudUtility {
/// <summary>
/// Verifies whether or not the specified message size can be accommodated in a Windows Azure queue.
/// </summary>
/// <param name="size">The message size value to be inspected.</param>
/// <returns>True if the specified size can be accommodated in a Windows Azure queue, otherwise false.</returns>
public static bool IsAllowedQueueMessageSize(long size) {
return size >= 0 && size <= (CloudQueueMessage.MaxMessageSize - 1) / 4 * 3;
} }
If message size is under the enforced limit, I should simply invoke the Queue Service API to enqueue the message "as is." If the message size is in excess of the limitation in question, the data flow becomes quite interesting. The following flowchart visualizes the subsequent steps:
In summary, if a message cannot be accommodated on a queue due to its size, it overflows into a message store capable of storing large messages. A tiny metadata message is then created consisting of a reference to the item in the overflow store. Finally, the metadata message is put on a queue. I always choose to compress a message before asserting its suitability for
persistence in a queue. This effectively expands the population of messages that can be queued without incurring the need to go into the overflow store. A good example is an XML document
perfect candidate to be simply put on a queue. You can modify this behavior in case the default compression is not desirable. It can be achieved by providing a custom serializer component elaborated in the next section.
There are several considerations that apply here, mainly from a cost perspective. As it can be noted in the above flowchart, I attempt to determine whether a large message can first overflow into Windows Azure Caching Service (referred herein as Caching Service for the sake of brevity).
Since the usage of distributed cloud-based caching service is subject to a charge, the cache overflow path should be made optional. This is reflected on the flowchart.
In addition, there may be situations when the message is quite large and therefore is not suitable for being stored in a size-constrained distributed cache. As of the writing of this article, the maximum cache size is 4GB. Therefore, we must take this into consideration and provide a failover path should we exceed cache capacityor quotas. The quotas come with eviction behavior that also needs to be accounted for.
The use of the Windows Azure Caching ServHFice as an overflow store helps reduce latency and eliminate excessive storage transactions when exchanging a large number of messages. It offers a highly available, distributed caching infrastructure capable of replicating and maintaining cached data in memory across multiple cache servers for durability. These benefits can be outweighed by the cache size limitation and costs associated with the service usage. It is therefore important to perform a cost-benefit analysis to assess the pros and cons of introducing the Caching Service as an overflow store in certain scenarios.
Given that the distributed cache storage is limited, it is essential to set out some further rules that will enable the efficient use of the cache. In connection to this, one important recommendation needs to be explicitly called out:
Due to specifics of its eviction behavior, Caching Service does not offer a complete and ultimate guaranteed durability when compared to the Windows Azure Blob Service. When used as an overflow store, Caching Service is best suited when individual messages are volatile in nature and are under 8MB in size. The term “volatile” means that messages are published into and subsequently consumed as quickly as possible. The 8MB recommendation is due to the optimal cache item size that is configured in the Caching Service by default.
I’m going to reflect the above recommendation in the code by providing a helper function that will determine whether or not the specified item size value can be considered as optimal when storing an item of the given size in the cache.
public static class CloudUtility {
private static readonly long optimalCacheItemSize = 8 * 1024 * 1024;
/// <summary>
Important
Important
/// Determines whether the specified value can be considered as optimal when storing an item of a given size in the cache.
/// </summary>
/// <param name="size">The item size value to be inspected.</param>
/// <returns>True if the specified size can be considered as optimal, otherwise false.</returns>
public static bool IsOptimalCacheItemSize(long size) {
return size >= 0 && size <= optimalCacheItemSize;
} }
Now that some initial pre-requisites are considered, it’s time to switch over to the consumer side and take a look at the implementation pattern for retrieving large messages from a queue. First, let’s visualize the process flow for the purposes of facilitating overall understanding:
To summarize the above flow, a message of an unknown type is fetched from a queue and compared against a metadata message type. If it is not a metadata message, the flow continues with decompression logic, so that the original message can be correctly reconstructed before being presented to the consumer. By contrast, if it was in fact a metadata message, it is
inspected to determine the type of overflow store that was used for storing the actual message. If it is identified as a message stored in the cache, the respective Caching Service API is invoked and the real message will be fetched before being decompressed and returned to the consumer.
In case the real message was put into a blob container, the Blob Service API will be targeted to retrieve the real message from the blob entity, decompressed and handed back to the caller.
In addition to handling Enqueue and Dequeue operations for large messages, there is a need to make sure that all overflowed payloads are removed from their respective overflow message
stores upon the consumer’s request. To accomplish this, one of the potential implementation patterns is to couple the removal process with the Delete operation when it’s being invoked for a given message. The visual representation of this operation can be depicted as follows:
Before we start implementing the patterns mentioned above, one last consideration worth making is the definition of a message. What would be considered a message, and what forms will it manifest itself in? Would it be a byte array, a stream of data, a simple type like a string, or a complex application-specific object which the developer implements as part of the solution object model? I truly believe that this is the area where we should not constrain ourselves. Let’s just assume that a message is of generic type <T> meaning it’s anything the developer wishes to use.
You will see that the end implementation will naturally unfold itself around this idea.
Putting all together, the following diagram summarizes all the three possible travel paths which are accounted for in the above design:
At this
/// <param name="queueName">The target queue name on which message will be placed.</param>
/// <param name="message">The payload to be put into a queue.</param>
void Put<T>(string queueName, T message);
/// <summary>
/// Retrieves a single message from the specified queue and applies the default visibility timeout.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The name of the source queue.</param>
/// <returns>An instance of the object that has been fetched from the queue.</returns>
T Get<T>(string queueName);
/// <summary>
/// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The name of the source queue.</param>
/// <param name="count">The number of messages to retrieve.</param>
/// <param name="visibilityTimeout">The timeout during which retrieved messages will remain invisible on the queue.</param>
/// <returns>The list of messages retrieved from the specified queue.</returns>
IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout);
/// <summary>
/// Gets a collection of messages from the specified queue and applies the default visibility timeout.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="queueName">The name of the source queue.</param>
/// <param name="count">The number of messages to retrieve.</param>
/// <returns>The list of messages retrieved from the specified queue.</returns>
IEnumerable<T> Get<T>(string queueName, int count);
/// <summary>
/// </summary>
/// <typeparam name="T">The type of the payload associated with the message.</typeparam>
/// <param name="message">The message to be deleted from a queue.</param>
/// <returns>A flag indicating whether or not the specified message has actually been deleted.</returns>
bool Delete<T>(T message);
}
There is also a need for one extra contract (interface) which will abstract access to the large-message overflow store. Two components implement the contract, one for each overflow store (blob storage and distributed cache). The contract is comprised of the following operations:
/// <summary>
/// Defines a generics-aware abstraction layer for Windows Azure Blob storage.
/// </summary>
public interface ICloudBlobStorage : IDisposable {
/// <summary>
/// Puts a blob into the underlying storage, overwrites if the blob with the same name already exists.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <returns>True if the blob was successfully put into the specified container, otherwise false.</returns>
bool Put<T>(string containerName, string blobName, T blob);
/// <summary>
/// Puts a blob into the underlying storage. If the blob with the same name already exists, overwrite behavior can be applied.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name into which a blob will be stored.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <param name="blob">The blob's payload.</param>
/// <param name="overwrite">The flag indicating whether or not overwriting the existing blob is permitted.</param>
/// <returns>True if the blob was successfully put into the specified container,
bool Put<T>(string containerName, string blobName, T blob, bool overwrite);
/// <summary>
/// Retrieves a blob by its name from the underlying storage.
/// </summary>
/// <typeparam name="T">The type of the payload associated with the blob.</typeparam>
/// <param name="containerName">The target blob container name from which the blob will be retrieved.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <returns>An instance of <typeparamref name="T"/> or default(T) if the specified blob was not found.</returns>
T Get<T>(string containerName, string blobName);
/// <summary>
/// Deletes the specified blob.
/// </summary>
/// <param name="containerName">The target blob container name from which the blob will be deleted.</param>
/// <param name="blobName">The custom name associated with the blob.</param>
/// <returns>True if the blob was deleted, otherwise false.</returns>
bool Delete(string containerName, string blobName);
}
Both contracts heavily rely on generic type <T>. It enables you to tailor the message type to any .NET type of your choice. I will however have to handle some extreme use cases, namely types that require special treatment, such as streams. I expand on this later on.
Regardless of the message type chosen, one important requirement will apply; the object type that represents a message on a queue must be serializable. All objects passing through the storage abstraction layer are subject to serialization before they land on a queue or overflow store. In my implementation, serialization and deserialization are also coupled with compression and decompression, respectively. This approach increases efficiency from a cost and bandwidth perspective. The cost-related benefit comes from the fact that compressed large messages inherently consume less storage, resulting in a decrease in storage costs. The bandwidth efficiency arises from savings on payload size thanks to compression, which in turn makes payloads smaller on the wire as they flow to and from the Windows Azure storage.
The requirement for serialization and deserialization is declared in a specialized interface. Any component that implements this interface must provide the specific compression, serialization, deserialization, and decompression functionality. An example of this interface is shown:
/// <summary>
/// Defines a contract that must be supported by a component which performs serialization and
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
public interface ICloudStorageEntitySerializer {
/// <summary>
/// Serializes the object to the specified stream.
/// </summary>
/// <param name="instance">The object instance to be serialized.</param>
/// <param name="target">The destination stream into which the serialized object will be written.</param>
void Serialize(object instance, Stream target);
/// <summary>
/// Deserializes the object from specified data stream.
/// </summary>
/// <param name="source">The source stream from which serialized object will be consumed.</param>
/// <param name="type">The type of the object that will be deserialized.</param>
/// <returns>The deserialized object instance.</returns>
object Deserialize(Stream source, Type type);
}
For compression and decompression, I use the DeflateStream component in the .NET Framework. This class represents the Deflate algorithm, an industry standard RFC-compliant algorithm for lossless file compression and decompression. In comparison to the GZipStream class, the former produces more optimal compressed images and generally delivers better performance. By contrast, the GZipStream class uses the GZIP data format, which includes a cyclic redundancy check (CRC) value for detecting data corruption. Behind the scenes, the GZIP data format uses the same compression algorithm as the DeflateStream class. In summary, GZipStream = DeflateStream + the cost of calculating and storing CRC checksums.
My implementation of the contract is included below. Note that compression algorithms can be easily toggled by replacing DeflateStream class with GZipStream and vice versa.
/// <summary>
/// Provides a default implementation of ICloudStorageEntitySerializer which performs serialization and
/// deserialization of storage objects such as Azure queue items, blobs and table entries.
/// </summary>
internal sealed class CloudStorageEntitySerializer : ICloudStorageEntitySerializer {
/// <summary>
/// Serializes the object to the specified stream.
/// </summary>
/// <param name="target">The destination stream into which the serialized object will be written.</param>
public void Serialize(object instance, Stream target) {
Guard.ArgumentNotNull(instance, "instance");
Guard.ArgumentNotNull(target, "target");
XDocument xmlDocument = null;
XElement xmlElement = null;
XmlDocument domDocument = null;
XmlElement domElement = null;
if ((xmlElement = (instance as XElement)) != null) {
// Handle XML element serialization using separate technique.
SerializeXml<XElement>(xmlElement, target, (xml, writer) => { xml.Save(writer); });
}
else if ((xmlDocument = (instance as XDocument)) != null) {
// Handle XML document serialization using separate technique.
SerializeXml<XDocument>(xmlDocument, target, (xml, writer) => { xml.Save(writer); });
}
else if ((domDocument = (instance as XmlDocument)) != null) {
// Handle XML DOM document serialization using separate technique.
SerializeXml<XmlDocument>(domDocument, target, (xml, writer) => { xml.Save(writer); });
}
else if ((domElement = (instance as XmlElement)) != null) {
// Handle XML DOM element serialization using separate technique.
SerializeXml<XmlElement>(domElement, target, (xml, writer) => { xml.WriteTo(writer); });
} else {
var serializer = GetXmlSerializer(instance.GetType());
using (var compressedStream = new DeflateStream(target,
using (var xmlWriter =
XmlDictionaryWriter.CreateBinaryWriter(compressedStream, null, null, false)) {
serializer.WriteObject(xmlWriter, instance);
} } }
/// <summary>
/// Deserializes the object from specified data stream.
/// </summary>
/// <param name="source">The source stream from which serialized object will be
/// <param name="source">The source stream from which serialized object will be