Scaling Azure Event Hubs Processing with Worker Roles
This post will show how to host an Azure Event Hubs EventProcessorHost in a cloud service worker role.
Background
Imagine that you have a scenario where you have a solution that needs to ingest millions of events per second, and you need to process these events in near real time. There are many scenarios where this might occur. Perhaps you built a solution for a consumer device such as Microsoft Band using the recently announced Microsoft Band SDK preview, you have a popular game where you are processing telemetry events, or you might have an IoT solution where you are processing events from many devices. Architecting for this type of solution poses many challenges. How do you enable ingest of messages at this scale? More importantly, how do you enable processing of the messages in a manner which does not create a performance bottleneck? How do you independently scale the message ingest and message processing capabilities? Most importantly, how do you achieve this for very little cost?
Event Hubs and EventProcessorHost
Azure Event Hubs enables massive scale telemetry processing for solutions that require logging millions of events per second in near real time. It does this by creating a gigantic write-ahead stream where the ingest and processing of data is handled separately. One way to think of this is as a massively scalable queue implementation. However, unlike queues, there is no concept of queue length in Azure Event Hubs because the data is processed as a stream. I wrote a demonstration of the sender side in the post Use JMS and Azure Event Hubs with Eclipse, where many senders send information to an Event Hub named “devicereadings”. The Event Hub then distributes the messages across partitions.
The question is now how to process the events in a scalable manner. That’s exactly what Event Hubs enables, scalable event processing. The Azure Event Hubs team made this processing easy by introducing the EventProcessorHost that enables a partitioned consumer pattern. Consider the following scenario where I have a worker role that is processing all of the stream data from the Event Hub. The EventProcessorHost will automatically create an instance for each partition.
We might see that a single worker role is not sufficient for our solution, that the worker role becomes CPU bound. As we saw in my previous post, Autoscaling Azure–Cloud Services, scaling a worker role creates a new instance of the worker role. The benefit of using the EventProcessorHost is that we can add more worker role instances, and the partitions will be balanced across them. We don’t have to manage the number of processor instances, the EventProcessorHost handles that for us.
Using the EventProcessorHost is simple because you need to implement one interface, IEventProcessor, the bulk of the work will be in the ProcessEventsAsync class.
Let’s work through building the solution.
Show Me Some Code
I am going to use the Service Bus Event Hubs Getting Started sample as the basis for this post, I made some changes to the code and I’ll explain along the way. First, I created a class library called EventHubDemo.Common. It contains NuGet references to Json.NET, Microsoft Azure Service Bus, and Windows Azure Configuration Manager.
It contains two classes: MetricEvent.cs and EventHubManager.cs.
MetricEvent is just a class used to (de)serialize JSON data.
MetricEvent.cs
- using System.Runtime.Serialization;
- namespace EventHubDemo.Common.Contracts
- {
- [DataContract]
- public class MetricEvent
- {
- [DataMember]
- public int DeviceId { get; set; }
- [DataMember]
- public int Temperature { get; set; }
- }
- }
The next class, EventHubManager.cs, is just a helper class for working with Event Hubs. It centralizes common logic that is used across multiple projects to work with the Event Hub connection string and to create an Event Hub.
Code Snippet
- using Microsoft.ServiceBus;
- using Microsoft.ServiceBus.Messaging;
- using System;
- using System.Diagnostics;
- namespace EventHubDemo.Common.Utility
- {
- public class EventHubManager
- {
- public static string GetServiceBusConnectionString()
- {
- string connectionString = Microsoft.WindowsAzure.CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
- if (string.IsNullOrEmpty(connectionString))
- {
- Trace.WriteLine("Did not find Service Bus connections string in appsettings (app.config)");
- return string.Empty;
- }
- ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);
- builder.TransportType = TransportType.Amqp;
- return builder.ToString();
- }
- public static NamespaceManager GetNamespaceManager()
- {
- return NamespaceManager.CreateFromConnectionString(GetServiceBusConnectionString());
- }
- public static NamespaceManager GetNamespaceManager(string connectionString)
- {
- return NamespaceManager.CreateFromConnectionString(connectionString);
- }
- public static void CreateEventHubIfNotExists(string eventHubName, int numberOfPartitions, NamespaceManager manager)
- {
- try
- {
- // Create the Event Hub
- Trace.WriteLine("Creating Event Hub...");
- EventHubDescription ehd = new EventHubDescription(eventHubName);
- ehd.PartitionCount = numberOfPartitions;
- manager.CreateEventHubIfNotExistsAsync(ehd).Wait();
- }
- catch (AggregateException agexp)
- {
- Trace.WriteLine(agexp.Flatten());
- }
- }
- }
- }
This makes it easy to reuse the logic in a Console application that sends messages or in an Azure worker role that receives them.
EventProcessorHost in an Azure Worker Role
The next step is to create a Cloud Service. I create a new Azure Cloud Service project named “EventProcessor” in Visual Studio.
Click OK, and you are prompted for the type of role you want to create. Choose a worker role, and rename it to something like “ReceiverRole”.
Click OK, and you now have two projects: the worker role code and the deployment project.
The bulk of the work will be in the ReceiverRole project. Add a reference to the EventHubDemo.Common library that we created previously. Next add the NuGet package “Microsoft.Azure.ServiceBus.EventProcessorHost”, which will add the required dependencies.
Next we add a class called SimpleEventProcessor.cs. This class will implement the IEventProcessor interface that we mentioned previously. When the OpenAsync method is called by the EventProcessorHost, we will write out the partition that it corresponds to. When a message is received in the ProcessEventsAsync method, we write the message out to Trace output. When the CloseAsync method is called, we write that out to trace as well.
SimpleEventProcessor.cs
- using EventHubDemo.Common.Contracts;
- using Microsoft.ServiceBus.Messaging;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace ReceiverRole
- {
- class SimpleEventProcessor : IEventProcessor
- {
- PartitionContext partitionContext;
- public Task OpenAsync(PartitionContext context)
- {
- Trace.TraceInformation(string.Format("SimpleEventProcessor OpenAsync. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
- this.partitionContext = context;
- return Task.FromResult<object>(null);
- }
- public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
- {
- try
- {
- foreach (EventData eventData in events)
- {
- try
- {
- var newData = this.DeserializeEventData(eventData);
- Trace.TraceInformation(string.Format("Message received. Partition: '{0}', Device: '{1}', Data: '{2}'",
- this.partitionContext.Lease.PartitionId, newData.DeviceId, newData.Temperature));
- }
- catch (Exception oops)
- {
- Trace.TraceError(oops.Message);
- }
- }
- await context.CheckpointAsync();
- }
- catch (Exception exp)
- {
- Trace.TraceError("Error in processing: " + exp.Message);
- }
- }
- public async Task CloseAsync(PartitionContext context, CloseReason reason)
- {
- Trace.TraceWarning(string.Format("SimpleEventProcessor CloseAsync. Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString()));
- if (reason == CloseReason.Shutdown)
- {
- await context.CheckpointAsync();
- }
- }
- MetricEvent DeserializeEventData(EventData eventData)
- {
- string data = Encoding.UTF8.GetString(eventData.GetBytes());
- return JsonConvert.DeserializeObject<MetricEvent>(data);
- }
- }
- }
Easy enough so far. Now add a class, Receiver.cs. This class will encapsulate the logic for registering and unregistering an IEventProcessor implementation.
Receiver.cs
- using Microsoft.ServiceBus.Messaging;
- using System.Diagnostics;
- namespace ReceiverRole
- {
- class Receiver
- {
- string eventHubName;
- string eventHubConnectionString;
- EventProcessorHost eventProcessorHost;
- public Receiver(string eventHubName, string eventHubConnectionString)
- {
- this.eventHubConnectionString = eventHubConnectionString;
- this.eventHubName = eventHubName;
- }
- public void RegisterEventProcessor(ConsumerGroupDescription group, string blobConnectionString, string hostName)
- {
- EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString, this.eventHubName);
- if (null == group)
- {
- //Use default consumer group
- EventHubConsumerGroup defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();
- eventProcessorHost = new EventProcessorHost(hostName, eventHubClient.Path, defaultConsumerGroup.GroupName, this.eventHubConnectionString, blobConnectionString);
- }
- else
- {
- //Use custom consumer group
- eventProcessorHost = new EventProcessorHost(hostName, eventHubClient.Path, group.Name, this.eventHubConnectionString, blobConnectionString);
- }
- Trace.TraceInformation("Registering event processor");
- eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();
- }
- public void UnregisterEventProcessor()
- {
- eventProcessorHost.UnregisterEventProcessorAsync().Wait();
- }
- }
- }
The final bit is to host this in an Azure cloud service using a worker role. I updated the WorkerRole.cs class.
Code Snippet
- using EventHubDemo.Common.Utility;
- using Microsoft.WindowsAzure;
- using Microsoft.WindowsAzure.ServiceRuntime;
- using System;
- using System.Diagnostics;
- using System.Net;
- using System.Threading;
- namespace ReceiverRole
- {
- public class WorkerRole : RoleEntryPoint
- {
- private Receiver receiver;
- private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
- public override void Run()
- {
- Trace.TraceInformation("ReceiverRole is running");
- //Get settings from configuration
- var eventHubName = CloudConfigurationManager.GetSetting("eventHubName");
- var consumerGroupName = CloudConfigurationManager.GetSetting("consumerGroupName");
- var numberOfPartitions = int.Parse(CloudConfigurationManager.GetSetting("numberOfPartitions"));
- var blobConnectionString = CloudConfigurationManager.GetSetting("AzureStorageConnectionString"); // Required for checkpoint/state
- //Get AMQP connection string
- var connectionString = EventHubManager.GetServiceBusConnectionString();
- //Create event hub if it does not exist
- var namespaceManager = EventHubManager.GetNamespaceManager(connectionString);
- EventHubManager.CreateEventHubIfNotExists(eventHubName, numberOfPartitions, namespaceManager);
- //Create consumer group if it does not exist
- var group = namespaceManager.CreateConsumerGroupIfNotExists(eventHubName, consumerGroupName);
- //Start processing messages
- receiver = new Receiver(eventHubName, connectionString);
- //Get host name of worker role instance. This is used for each environment to obtain
- //a lease, and to renew the same lease in case of a restart.
- string hostName = RoleEnvironment.CurrentRoleInstance.Id;
- receiver.RegisterEventProcessor(group, blobConnectionString, hostName);
- //Wait for shutdown to be called, else the role will recycle
- this.runCompleteEvent.WaitOne();
- }
- public override bool OnStart()
- {
- // Set the maximum number of concurrent connections
- ServicePointManager.DefaultConnectionLimit = 12;
- // For information on handling configuration changes
- // see the MSDN topic at https://go.microsoft.com/fwlink/?LinkId=166357.
- bool result = base.OnStart();
- Trace.TraceInformation("ReceiverRole has been started");
- return result;
- }
- public override void OnStop()
- {
- Trace.TraceInformation("ReceiverRole is stopping");
- this.runCompleteEvent.Set();
- try
- {
- //Unregister the event processor so other instances
- // will handle the partitions
- receiver.UnregisterEventProcessor();
- }
- catch(Exception oops)
- {
- Trace.TraceError(oops.Message);
- }
- base.OnStop();
- Trace.TraceInformation("ReceiverRole has stopped");
- }
- }
- }
When the worker role starts, we capture the trace output. The worker role then starts to run, and we get settings from configuration in order to connect to the Event Hub and register our event processor. A key point to call out is line 41 where we use the host name of the worker role instance, this is needed in order to obtain a lease for the partitions across multiple instances. Also notice line 45 where we wait for a signal. That signal is set on line 67 when the worker role is stopped. The processing of events is handled on a separate thread, we only need to use the ManualResetEvent to prevent the Run method from completing (which would cause the worker role to recycle).
The sample code uses CloudConfigurationManager to obtain settings. Those settings are part of the deployment process, shown in the next section.
Role Configuration
Go to the EventProcessor project that defines the deployment configuration for the worker role. Expand the Roles folder and double-click on the ReceiverRole node.
When you double-click, go to the Settings tab. This is where we provide the various configuration values.
Note: It is a best practice to avoid putting secrets such as connection strings in source code. You would typically set these as part of a deployment script. For more information, see Building Real-World Cloud Apps with Azure.
You can also configure diagnostics for the worker roles. We will use the diagnostics capabilities when we test out the solution. On the Configuration tab, click the Configure button.
That allows you to configure the log level for the application. I set it to Verbose.
Now right-click the deployment project and choose Publish. You are prompted to create a cloud service and storage account.
Once created, go through the wizard to complete publishing.
Watch the progress in the Microsoft Azure Activity Log.
Once deployed, you will have a single production instance for your worker role.
Testing It Out
Once the role is deployed, go to Visual Studio and right-click the ReceiverRole node under Cloud Services and choose “View Diagnostics Data”.
On that page, expand the Microsoft Azure application logs section, then click View all data.
You will then see the Azure Storage table called WADLogsTable, and you can see the events that happened. When the worker role is running, we see the entry “ReceiverRole is running”. The SimpleEventProcessor is registered once, and the EventProcessorHost takes care of creating 16 instances, one for every partition (not all log entries are shown here, but there are 16 in the log).
Let’s scale our service. While we could use the autoscale service to scale by CPU, we can see how things work by manually scaling as well. Let’s manually scale by going to the Azure management portal (https://manage.windowsazure.com) and setting the scale for the cloud service to 4.
Save, and wait while the new instances are created and our packages are deployed automatically to the new instances.
Once the 3 new virtual machines are created, we go back to see what happened in the logs. This is really cool to see how the EventProcessorHost automatically balances the processing across the available instances. You can see here that partitions 1,10, and 12 are handled by instance 2, while partitions 3,13, and 15 are handled by instance 1.
Let’s scale the number of instances down to 2.
This will cause our OnStop() method to be called in each worker role that is no longer needed, and our code to unregister the event processor is called. This balances the processing back to the remaining worker role instances. First we see the close operations:
Next we see that the closed partitions are opened using another available processor:
Testing Out the Sender
To test things out, I can use the same client that I built in the post Use JMS and Azure Event Hubs with Eclipse. This is a simple web page that sends messages to the Event Hub using AMQP 1.0. Alternatively, I can modify the code in the Service Bus Event Hubs Getting Started sample to send messages. Once I send messages to the Event Hub, we see them processed in the event log, automatically balanced across partitions among the 2 available worker roles.
Summary
Event Hubs make it very easy to build highly scalable solutions that can process millions of messages per second in near real time. This post shows how you can use an Azure worker role with EventProcessorHost to process the messages and how the load will automatically balance across the available processors.
I am not providing the sample code as a download because it is already provided in the post Service Bus Event Hubs Getting Started, and I have listed the code that I used in this post.
For More Information
Use JMS and Azure Event Hubs with Eclipse
Building Real-World Cloud Apps with Azure – Free eBook!
Service Bus Event Hubs Getting Started – sample used as the basis of this post
Comments
Anonymous
February 24, 2015
The comment has been removedAnonymous
February 24, 2015
@Jim If I am not mistaken, and if I understand the question correctly, the lease should expire, allowing another instance to lease the partition and begin processing at the last recorded offset.Anonymous
February 24, 2015
Correct, the lease will expire and the other instances will take over that workload at the last recorded offset, which is set in blob storage when CheckpointAsync() is called. In my example, I am aggressively calling context.CheckpointAsync(). The "getting started" sample that I based this from calls CheckpointAsync() on a timed basis, something like 5 minutes. Other examples call CheckPointAsync() once every 100 messages.Anonymous
February 24, 2015
@Kirk, @Luke I have waited over 30 minutes, and the existing other instances never pick up the work. I have a LeaseInterval of 30 seconds and am calling CheckpointAsync() every 5 minutes (as long as new values are coming in, which they definitely are). However, if I start up a new instance, then it gladly picks up the work that the dead instance was handling.Anonymous
February 25, 2015
- Are you using the 2.6 or 2.6.1 ServiceBus.dll and the 1.1.0 EventProcessorHost.dll - please confirm this.
- When you say do not gracefully shut down - it sounds like you're doing something that is not turning off / disconnecting the machines. If the app is still running it's probably still renewing leases in the background. How are you simulating this non-graceful shutdown?
Anonymous
February 25, 2015
The comment has been removedAnonymous
February 25, 2015
I have many questions about the partition count. What is the point of the partition count configuration in this example? It presumably stays at the maximum value even though the instance count increases. Why should this be a configuration value if it does not change when scaling? Why would you choose any particular partition count other than the maximum value? I am also assuming that you can't have more instances running than the partition count. Is this correct?Anonymous
February 26, 2015
How long are you waiting before starting the other reader? It should take a few refresh cycles (which the default is 30 seconds) - so try waiting at least 2 minutes.Anonymous
February 26, 2015
@Dan I have waited over 30 minutes to see if the existing instances would pick up the partitions that were not being read. Those existing instances never have. Even after 30 minutes I can start a new instance, and it instantly picks up the partitions that were not being read.Anonymous
February 26, 2015
The comment has been removedAnonymous
February 26, 2015
Also, there is a great tool for all things service bus in Azure. blogs.msdn.com/.../service-bus-explorer-2-5-now-available.aspx This tool will allow you to hammer a hub or send a single message. Highly recommended.Anonymous
March 23, 2015
Hi, is calling CheckpointAsync aggressively like this bad practice, as it could potentially run up large azure bills? Thanks,Anonymous
March 23, 2015
@Andy - the CheckPointAsync code really just writes a number to Azure blob storage that indicates the last position processed during its lease. You'll have to load test your application to determine if the writes to blob storage, even though asynchronous, are impeding throughput. Buffering that write even for 30 seconds could have a performance impact on your code, but it's up to you to perform load testing to determine the impact, if any.Anonymous
February 07, 2016
There is a potential syntax error in EventHubManager method GetServiceBusConnectionStringuse this:var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");instead of:string connectionString = Microsoft.WindowsAzure.CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");HTH.