Advanced scenarios with Windows Azure Queues

Last week, in Brian Prince’s article, Using the Queuing Service in Windows Azure, you saw how to create, add messages into, retrieve and consume those messages from Windows Azure Queues. While being a simple, easy-to-use mechanism, a lot of scenarios are possible using this near-FIFO queuing mechanism. In this article we are going to focus on three scenarios which show how queues can be an important and extremely scalable component in any application architecture:

  • Back off polling, a method to lessen the number of transactions in your queue and therefore reduce the bandwidth used.
  • Patterns for working with large queue messages, a method to overcome the maximal size for a queue message and support a greater amount of data.
  • Using a queue as a state machine.

The techniques used in every scenario can be re-used in many applications and often be combined into an approach that is both scalable and reliable.

To get started, you will need to install the Windows Azure Tools for Microsoft Visual Studio. The current version is 1.4, and that is the version we will be using. You can download it from http://www.microsoft.com/windowsazure/sdk/.

N.B. The code that accompanies this article comes as a single Visual Studio solution with three console application projects in it, one for each scenario covered. To keep code samples short and to the point, the article covers only the code that polls the queue and not the additional code that populates it. Readers are encouraged to discover this themselves.

Back off Polling Queues

Typically when polling queues, one tends to write an infinite loop that checks for messages, processes messages when they arrive, sleeps a second and then re-checks the queue. This is an algorithm often used for processing queue messages and nothing is wrong with that. If we code this approach, a simple loop which looks like the following can be written:

while (true)
{
    var message = queue.GetMessage();
    if (message != null)
    {
        Console.WriteLine(string.Format("Processed message: {0}", message.AsString));
        queue.DeleteMessage(message);
    }
    else
        Thread.Sleep(TimeSpan.FromSeconds(1));
}

As you can see, this algorithm will be able to process all messages fairly easy. Now imagine you have a queue that is being filled with messages during the day and you have a few worker roles running the above code. When you have four worker roles, four workers are polling data on the queue every second. That’s okay when there are a lot of messages queued, but when there’s a slow flow of messages, these workers will be polling the queue every second even if no data is available. That’s a loss in bandwidth, resources and results in transactions being billed. Useless transactions that is, because there’s no data available!

A well-known technique for solving this is using a self-adjusting delay in the queue polling algorithm. For example, one could poll the queue and if a message exists, proceed. If no message is present, it sleeps for one second. If the second time no message is present, it sleeps for 2 seconds, then 3 seconds, and so on until the sleep period reaches a maximum limit you have set. If messages are present at a given time, this number of seconds to sleep is reset. By doing this, the amount of transactions is drastically decreased while message delivery is still relatively fast. Of course this is a simple algorithm. A more advanced version of this is available by the name of truncated exponential back off, but we will not be using that for this scenario. If you are interested in this technique: it’s used by the TCP protocol and described on Wikipedia (http://en.wikipedia.org/wiki/Retransmission_(data_networks).

Let’s implement the code for the “simple” back off algorithm. First of all, a connection to the queue should be established. Since all demos in this article use development storage there’s no real need for configuration and the following is enough:

// Create a storage account (development storage)
var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference("demobackoffpolling");
queue.CreateIfNotExist();

Next, two variables should be defined: the current back off value which we’ll start at zero and the maximum time to back off between polls. We’ve set this to ten. Both values are in seconds.

int currentBackoff = 0;
int maximumBackoff = 10

The queue polling will occur in an infinite loop in which the pace of message processing will be altered depending on the availability of queue messages.

while (true)
{
    var message = queue.GetMessage();
    if (message != null)
    {
        // Reset backoff
        currentBackoff = 0;

        // Process the message
        Console.ForegroundColor = ConsoleColor.Green;
        Console.WriteLine(
          "Processed message with contents: {0}", message.AsString);
        // Mark completed
        queue.DeleteMessage(message);
    }
    else
    {
        if (currentBackoff < maximumBackoff)
        {
            currentBackoff++;
        }
        Console.ForegroundColor = ConsoleColor.Blue;
        Console.WriteLine("Backing off for {0} seconds...", currentBackoff);
        Thread.Sleep(TimeSpan.FromSeconds(currentBackoff));
    }
}

If you start this code is run, this algorithm can be seen in action:

image001

Messages are processed fairly rapidly after insertion in the queue (a maximum of 10 seconds before processing starts because of the cap we set on the back off algorithm). Transaction amount and wasted bandwidth and resources are reduced as well: if no messages are present the algorithm will wait and try to adjust its polling frequency to the frequency of incoming messages.

Working With Large Queue Messages

There’s one thing with Windows Azure Queues that is fixed: the maximum size of a message in a queue is 8KB. If you have to store more data in a message, there’s no other option but storing the message contents in blob or table storage and referring to the blob from the queue message. Example scenarios where this may be required are video encoding or image resizing: there’s no way a hundred megabyte video is going to fit in a queue message. Instead, a work ticket can be inserted in the queue referring to shared external storage where the actual video is stored. Using this approach, the idempotency of the queue is maintained while storage capacity is extended. The downside of this is in fact two transactions are now needed to add a message to a queue – one for the blob, one for the queue – and another two are needed to retrieve that message.

Let’s start with adding a large message to a queue. We’ll need two storage clients for that: one for queues and one for, in this case, blobs. You can use table storage or any other persistency mechanism out there for your shared external storage. The demo code in this case uses the development storage account again.

// Create a storage account (development storage)
var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;

// We will need queues and blobs
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference("demolargemessages");
queue.CreateIfNotExist();

var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference("demolargemessages");
container.CreateIfNotExist();

After that, messages can be added. The order in which you insert into the different storage mechanisms is important. You can choose to:

  • Insert the message into blob storage first and then into the queue. The advantage of this is that when the large message upload to storage fails, an exception can be caught and shown to the user before a “poison” message is inserted into the queue.  If the blob fails to upload, there’s no need to insert a reference message into the queue.
  • Insert the message into the queue first and then into blob storage. This just swaps the entire logic: the message producer will not care if the two inserts succeed or fail but the consumer processing the message will be able to distinguish poison messages.

The following code demonstrates the first approach: it stores a message identifier in the queue whereas the actual data to process is stored in blob storage with the message identifier as its filename.

// Create a "message ID"
var messageId = Guid.NewGuid().ToString();

// Create large data and store it in a blob
var largeMessage = "Actually I'm not that large...";
var blob = container.GetBlobReference(messageId);
try
{

    blob.UploadText(largeMessage);
}
catch
{
    // Logging, error handling, ...
}

// Now create the queue message referring the blob message id
var message = new CloudQueueMessage(messageId);
queue.AddMessage(message);

Next, reading the message from the queue. This is very straightforward: read a message from the queue, get the corresponding blob from storage and delete both. But should message or blob be deleted first? Again, the choice is yours. Deleting the queue message first may result in unused data in blob storage if something fails in between. The opposite, removing the blob first and then the queue, may result in a poison message in the queue. Here’s the solution in the sample code:

// Get the corresponding message
var blob = container.GetBlobReference(message.AsString);
var largeMessage = blob.DownloadText();
// Process the message
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Processed message with contents: {0}", largeMessage);

// Remove the blob
try
{
    blob.Delete();
}
catch
{
    // Logging, tracing, ...
    // Or... Insert the message ID into a "cleanup" queue and clean it later
}

// Mark completed
queue.DeleteMessage(message);

As you can see, we try to delete the blob first, after which the message is deleted from the queue. If the blob removal fails for some reason, we log it. Optionally, a message can be inserted in a “cleanup” queue which can be processed later to remove the blob anyway. This ensures no obsolete data is left in storage.

Using a Queue as a State Machine

When developing a state machine pattern in .NET, Windows Workflow Foundation is often used to accomplish this task. Another option to create a state machine on Windows Azure though is using queues to do it. The easiest way is to create one queue per state that can be available in the state machine. Consider the following state machine:

State machine

When translated to queues, this may look like the following:

State machine in queues

When translating this idea to code, a producer, an ASP.NET website or something similar, puts a CloudQueueMessage into a  CloudQueue for messages in the “new” state. A worker thread polls this queue and picks up the message. Once processed, the message is inserted into the “inprogress” CloudQueue and removed from the “new” CloudQueue. For every further state, this process is repeated.

Note that when transferring messages from one queue into another, it is important to first insert it into the next queue before deleting it from the current queue. Failing to do so may result in messages being lost, for example when a worker role crashes before it was able to write into the next queue.

When running the sample code, you can easily see the flow that is being created by constantly advancing to the next queue, or the next state of a state machine.

Let’s look at some code. The same process to transfer a message between the “new” and “inprogress” queues and between the “inprogress” and “finished” queues is essentially the same (we just name the queues in the transfer differently) so we’ll only look at the code for one transfer. Only the actual processing between two states will be different.
First of all, a queue client for each queue is required: one for the queue being monitored, one for the queue where the message will end up after processing. Optionally, a third queue client can be required if an order can go from “new” to “accepted” or “rejected”, depending on the processing.

In this example, a message will be moved from “new” to “inprogress”. These two queues should be referenced:

// Create a storage account (development storage)
var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;

// We will need 2 queues
var queueClient = storageAccount.CreateCloudQueueClient();

var newOrderQueue = queueClient.GetQueueReference("neworder");
newOrderQueue.CreateIfNotExist();

var inProgressQueue = queueClient.GetQueueReference("inprogressorder");
inProgressQueue.CreateIfNotExist();

Next, the newOrderQueue will be polled for order messages. This results in a standard CloudQueueMessage instance, containing either all order data, or referencing a blob/table containing more data.

N.B. Remember you actually can combine the techniques described in this article.

var message = newOrderQueue.GetMessage();

Of course processing happens. This can be notifying someone a new order has arrived, or sending a “thank you for your order” e-mail to the client.

// Process the message
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Transferring order {0} from new ->  inprogress.", message.AsString);

After processing has occurred and proved successful, the message can be inserted into the next state queue, after which it can be removed from the originating queue. It is important to maintain this flow (insert/delete instead of delete/insert) because of integrity and reliability of the system you are creating.

// Add to "inprogress" queue
inProgressQueue.AddMessage(new CloudQueueMessage(message.AsString));

// Mark completed
newOrderQueue.DeleteMessage(message);

It’s very refreshing to think about defining a state pattern as a combination of queues. The example I gave you is pretty trivial but it’s not that hard to create things like an if-then-else structure using multiple queues where each branch of the decision logic is represented by a different queue.

Summary

In this article, we’ve looked at three ways to leverage queues beyond just inserting, processing and deleting messages.

You can optimize the polling of queues to reduce traffic and transactions. If you consider 4 polls per second vs. an average of 1 poll every 5 seconds, your transaction count will be 720 polls per hour in the latter case vs. 14400 polls in the first case. That’s an immense amount of processing power on a worker role you can use for other things! Do think about latency though: having one poll every minute may not be fast enough for your users if they expect a result within 2 seconds.

You can also use the work ticket pattern to effect large messages in queues: queues can be used as the pure insert/process/delete mechanism, while other storage services, like blob storage, can be used for storing data that exceeds the queue service’s limits.

Finally, queues can also be used as a state machine, where each state can be processed by different processes like worker roles or different threads sharing a single role.

The techniques used in every scenario can be re-used in many applications and often be combined into an approach that is both scalable and reliable.

You might also like...

Comments

About the author

Maarten Balliauw

Maarten Balliauw Belgium

Maarten Balliauw is a technical consultant in web technologies at RealDolmen, one of Belgium's biggest ICT companies. His interests are ASP.NET (MVC), PHP and Windows Azure. He's a Microsoft Mos...

Interested in writing for us? Find out more.

Contribute

Why not write for us? Or you could submit an event or a user group in your area. Alternatively just tell us what you think!

Our tools

We've got automatic conversion tools to convert C# to VB.NET, VB.NET to C#. Also you can compress javascript and compress css and generate sql connection strings.

“It works on my machine.” - Anonymous