Solving a Throttling Problem with Azure
This post shows how we addressed a throttling problem when writing to an existing system using Microsoft Azure.
I am working on a project with a customer, and we had the need to write to an existing system. The idea was to collect input from lots of systems and input them into a single system for centralized visibility. For instance, I might want to collect data from lots of systems and post to a Twitter feed.
The problem is that the system you are writing to may constrain how many messages you can send to it at a time. For instance, the Twitter API for search allows you up to 450 search requests per app in a 15 minute window. What if you need a lot more than that? Of course, we immediately see that we can create multiple apps. How do you then scale out the processing for each app so it processes only a bit of the work?
A fairly simple solution to this problem is to use storage queues in Microsoft Azure. If you are familiar with MQSeries or MSMQ, then you are familiar with storage queues. Messages are pushed into a queue and persisted using storage in Microsoft Azure. Now here’s the really interesting part: each message is processed only once. That means that you can easily connect multiple clients to the same queue and guarantee that the same message will not be processed multiple times.
Queues are created when you create a storage account in Microsoft Azure. Storage accounts contain blobs, tables, and queues. Each queue can process 500 messages per second.
One of my favorite new additions to Microsoft Azure is WebJobs. A WebJob can be implemented in several ways, but my favorite is to create a console application using the WebJobs SDK (obtained as a NuGet package). The console application is then uploaded to an Azure Web Site, where it becomes a running process in the cloud. I needed 3 processes, so I created 3 different WebJobs. Using Visual Studio 2013 Update 3, I can easily create a console application, add the Microsoft.Azure.Jobs NuGet package, and then publish the console application as a WebJob.
We don’t want to repeat the code in each application, so we will put the shared logic in a class library. First, I add a class library project, and add the Microsoft.Azure.Storage NuGet package to it. I changed the dropdown to “Stable Only” to get the released version of the storage SDK.
In the class library, I add a class to model the data that I want to store.
Code Snippet
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using Microsoft.WindowsAzure.Storage;
- using Microsoft.WindowsAzure.Storage.Table;
- namespace ConsoleApplication1
- {
- public class JohnStuff : TableEntity
- {
- public JohnStuff()
- {
- this.PartitionKey = "John";
- this.RowKey = Guid.NewGuid().ToString();
- }
- public string MyData { get; set; }
- public string AppName { get; set; }
- }
- }
You can see that our class will save data into a property “MyData”, and we will capture the name of the app in “AppName”. The PartitionKey and RowKey properties are provided because we derive from the TableEntity class, which is used to serialize our entity to table storage.
Now we write some code to serialize the data to table storage. This method is in the class library, and will be referenced by each console application.
Code Snippet
- using ConsoleApplication1;
- using Microsoft.WindowsAzure;
- using Microsoft.WindowsAzure.Storage;
- using Microsoft.WindowsAzure.Storage.Table;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace QueueDemo
- {
- public class JobStuff
- {
- public static void WriteToTableStorage(string inputText)
- {
- string connectionString = CloudConfigurationManager.GetSetting("MyStorage");
- CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
- CloudTableClient client = storageAccount.CreateCloudTableClient();
- CloudTable table = client.GetTableReference("myoutput");
- table.CreateIfNotExists();
- //Use the appname to track which instance processed it
- string appName = CloudConfigurationManager.GetSetting("AppName");
- JohnStuff j = new JohnStuff { MyData = inputText, AppName = appName };
- TableOperation operation = TableOperation.InsertOrReplace(j);
- TableResult result = table.Execute(operation);
- }
- }
- }
The code here is very basic, we just write a message to a table in Microsoft Azure Storage.
Next, I create the 3 console applications. In each project, right-click and choose Manage NuGet Packages, then add the Microsoft.Azure.Jobs NuGet package. Make sure to change the dropdown from “Stable Only” to “Include Prerelease”.
Add a project reference to the class library from each console application.
Now, update each console application to use the Web Jobs SDK to monitor a queue and then call the method in our class library to write a message to table storage.
Code Snippet
- using Microsoft.Azure.Jobs;
- using Microsoft.WindowsAzure;
- using Microsoft.WindowsAzure.Storage;
- using Microsoft.WindowsAzure.Storage.Table;
- using QueueDemo;
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace ConsoleApplication1
- {
- class Program
- {
- static void Main()
- {
- JobHost host = new JobHost();
- host.RunAndBlock();
- }
- public static void ProcessQueueMessage([QueueTrigger("myqueue")] string inputText)
- {
- JobStuff.WriteToTableStorage(inputText);
- }
- }
- }
That’s how easy it is to write code that monitors a queue for updates and then processes the message. The Web Jobs SDK will use the settings in your configuration file to determine the storage account to monitor for changes. We add 2 connection strings for the Web Jobs SDK to use, and a configuration appsetting that our custom code will use.
Code Snippet
- <?xml version="1.0" encoding="utf-8" ?>
- <configuration>
- <appSettings>
- <add key="MyStorage"
- value="DefaultEndpointsProtocol=https;AccountName=[accountname];AccountKey=[accesskey]"/>
- <add key="AppName"
- value="App1"/>
- </appSettings>
- <connectionStrings>
- <add name="AzureJobsDashboard"
- connectionString="DefaultEndpointsProtocol=https;AccountName=[accountname];AccountKey=[accesskey]"/>
- <add name="AzureJobsStorage"
- connectionString="DefaultEndpointsProtocol=https;AccountName=[accountname];AccountKey=[accesskey]"/>
- </connectionStrings>
- <startup>
- <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
- </startup>
- </configuration>
A neat trick to obtaining the connection string value is to use the Azure tools in Visual Studio 2013. You can right-click and choose to create a new storage account. Then click on your storage account and go to Properties.
Click the Connection String property and then click the ellipses. You can then copy the connection string value and paste for each of the values above.
Something else to point out… the appsetting for AppName is used to determine which of the 3 console applications is writing to table storage. So we make the AppName unique for each of the 3 console applications (I used App1, App2, and App3). Yeah, I could have done something with reflection, getting the assembly name… this was a quick POC to prove it works I leave this as an exercise to the reader.
Why 3 console applications? I could have just created a single console application and then manually created 3 different Web Jobs using the same console application. I wanted to show off the new tooling in Visual Studio 2013 Update 3, of course! I could have zipped the whole output of the project and uploaded using the Microsoft Azure portal, but look how easy Visual Studio makes this now. Just right-click and choose Publish as Azure WebJob.
A WebJob is deployed to an Azure Website. You can create a new Microsoft Azure Website, or choose an existing one. We will choose a publish target to a web site.
I choose an existing Microsoft Azure Website.
Finally, I choose publish.
I do this for each of the 3 console applications. Next, I go to the Microsoft Azure Portal and I look at my Website, and I see that there are 3 WebJobs.
Let’s test things out. Let’s use Visual Studio 2013 to send a queue message and see what happens. Go to the Server Explorer, expand the Azure node, and go to your storage account. If your queue doesn’t exist, create it.
Once the queue exists, you can double-click it and see the messages currently in queue. Click the Add Message button to add a new queue message.
You can see that a new message is then added to the queue.
Click refresh, and it is now gone.
Let’s go check the table to see if something was written to it. In Visual Studio, expand the Azure / Storage / Tables node to see your table. Double-click it and see that the table contains a new row.
Looking at the data, we can see that, of the 3 console applications, the one with “App1” in its configuration file processed it. So, we know that data is flowing!
Now, let’s push a bunch of messages at the queue and see how things react. Add a new console application, add a NuGet package for Microsoft.Azure.Storage, and add the following code:
Code Snippet
- using Microsoft.WindowsAzure;
- using Microsoft.WindowsAzure.Storage;
- using Microsoft.WindowsAzure.Storage.Queue;
- using System;
- using System.Collections.Generic;
- using System.Threading.Tasks;
- namespace QueueLoader
- {
- class Program
- {
- static void Main(string[] args)
- {
- string connectionString = CloudConfigurationManager.GetSetting("MyStorage");
- CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
- CloudQueueClient client = storageAccount.CreateCloudQueueClient();
- CloudQueue queue = client.GetQueueReference("myqueue");
- queue.CreateIfNotExists();
- List<Task> tasks = new List<Task>();
- for(int i=0;i<500;i++)
- {
- string data = i.ToString();
- Task task = Task.Factory.StartNew(async () =>
- {
- await queue.AddMessageAsync(new CloudQueueMessage(data));
- });
- tasks.Add(task);
- }
- Task.WhenAll(tasks);
- Console.ReadLine();
- }
- }
- }
This code will add a message asynchronously to the queue, taking advantage of multiple CPUs on your machine to submit messages in parallel. We run the code and wait just a few seconds. We can refresh the queue and watch messages arrive and leave, and refresh the table to see what is written to the table.
You can see from the MyData column that none of the values are repeated, so each message is processed only once. Further, you can see from the AppName column which application processed it.
Thus, we prove that, by using an interim queue, we guarantee delivery, it is processed by a single consumer, and we can scale the event source and processing independently to interact with a throttled resource.
Looking at the values in the screen shot above, you can see that the load is not sequentially distributed, App1 processed 7 of the 16 messages shown in the screen shot. The distribution of message processing is random. A quick analysis of the results data shows us that the processing is not evenly distributed.
This post focused on using a queue-centric work pattern. We are simply using a greedy pattern where each process pulls work from the queue as a competing consumer. Thus, equal distribution was not our goal. If you need equal distribution (such as in round-robin load balancing), then consider adding a router to smooth distribution across processes.
We do not demonstrate writing to a throttled resource here, we simply log to table storage to prove that each message is processed only once, but you can take this concept further to actually call a throttled resource with multiple instances.
Get Started with the Azure WebJobs SDK
Anonymous
October 12, 2014
Amazon Marketplace Web Service (Amazon MWS) is an incorporated Web administration API that helps Amazon dealers to automatically trade information on listing, orders, payments, billing, and the sky is the limit from there.visit the link to know more details about Amazon MWS.Anonymous
October 12, 2014
Amazon Marketplace Web Service (Amazon MWS) is an incorporated Web administration API that helps Amazon dealers to automatically trade information on listing, orders, payments, billing, and the sky is the limit from there.To know more about Amazon MWS visit the link gatelogix.com/.../121Anonymous
June 24, 2015
Congrats - a great post that is completely let down by the fact that it doesn't actually do what it says in the title. You've failed to answer the fundamental question of how to actually THROTTLE the processing of records in the queue.Anonymous
June 25, 2015
PJC - Perhaps you didn't understand that the client event source could overwhelm the throttled resource, thus introducing a queue enables us to process at a rate that the throttled resource can accept. Another approach to this would be EventHub with EventProcessorHost. blogs.msdn.com/.../scaling-azure-event-hubs-processing-with-worker-roles.aspx In both scenarios, there is an interim buffer where the ingest of messages is independent of the rate at which they are processed, thus solving a throttling problem.