Performance Tuning for HDInsight Storm and Microsoft Azure EventHubs
Apache Storm is a popular real time data processing framework. Microsoft Azure HDInsight provides a service to deploy a Storm cluster in the cloud. Customers can readily use HDInsight Storm clusters to process data from Azure EventHubs using a Java based spout implementation. The EventHubSpout source code has been integrated into Apache Storm trunk. You can find the code here:
https://github.com/apache/storm/tree/master/external/storm-eventhubs
Note that EventHubSpout won't be available until Storm 0.11.0. Before that we keep the latest and greatest jar here:
https://github.com/hdinsight/hdinsight-storm-examples/tree/master/lib/eventhubs
Performance Basics
One frequently asked question is about the performance capability of the HDInsight Storm + EventHubs solution. Note that the precise performance depends on the specific Storm topology and the business logic of the application. But in most cases, you'll find EventHubs is the bottleneck. Note that you can always increase the cluster size if Storm is the bottleneck.
The throughput of EventHubs is governed by the concept of Throughput Unit (TU), according to (https://azure.microsoft.com/en-us/pricing/details/event-hubs/). 1 TU means 1MB/s or 1000 message/s ingress, 2MB/s or 2000 message/s egress (The message/s limit for egress is not enforced yet as of 5/15/2015). When Storm pulls data from EventHubs, the receive throughput is governed by the EventHubs egress limit. By default your EventHubs has 1 TU and you can change the throughput unit for your EventHubs namespace in Azure portal to up to 20 TU. Note that there are also protocol overhead in the order of 50 bytes per message. So if your message is 100 byte, with 20 TU, you can achieve 350-400K message/s throughput when pulling data into Storm topology today. When message/s egress limit is enforced at EventHubs server side, you can only achieve 40K message/s. In real time scenario, the throughput also depends on the EventHubs ingress limit because the data has to be sent to EventHubs before it can be consumed by Storm. In that case, the maximum throughput you can get is 20MB/s or 20K message/s.
If you need more throughput, you can call EventHubs support to increase the Throughput Unit to up to 1000. This means that you can achieve up to 1GB/s, or 1M message/s throughput using EventHubs.
EventHubs also support the concept of partitions. EventHubs only guarantee sequence delivery of messages within each partition. Messages from different partitions can be processed in parallel. EventHubs also has 5 TU limit on each partition.
For better performance the number of tasks for EventHubSpout should always match the number of partitions. For best performance, we recommend running 1 EventHubSpout task per node.
In some situations, you may find that the Storm cluster is the performance bottleneck. You can verify this by checking the CPU utilization of the worker node while the topology is running. If it is too high, you need to increase your cluster size. But it is rarely useful to increase the cluster size beyond the number of partitions.
Performance Tuning Tips
In general, we recommend setting the cluster size to be the same of the number of partitions in EventHubs so that each node is dedicated to 1 partition. Having a larger cluster size than the number of partitions means you can run data processing bolts in dedicated nodes. The downside of this situation is that Storm has to shuffle tuples across nodes when sending tuples from spout to bolt, introducing latency and overhead for serialization and de-serialization. If you are not doing much business logic in your topology, you might be safe to reduce the cluster to half or 1/4 of the number of partitions.
By default, Storm processes 1 message at a time (unless you are looking at Trident which does micro-batching), which should result in pretty low latency. If you want to further reduce latency, you need to optimize processing code in bolts and reduce the number of bolts in your topology. You may also consider choosing A8 or A9 node types when creating HDInsight Storm clusters because they are optimized for network speed.
The same tricks to reduce latency also work to increase throughput in the case of high CPU utilization in your cluster. Furthermore, if you are OK with losing tuples occasionally, you may consider turning off ack. This will greatly reduce CPU utilization. To do this, you can set topology.acker.executors to 0 during cluster creation time or in code do something like this: config.setNumAckers(0).
Another trick to improve performance is to use localOrShuffleGrouping() instead of shuffleGrouping() to connect EventHubSpout to the downstream bolts. What this does is to tell Storm to send a tuple to the bolt instance in the same worker process, eliminating the cost of inter-worker communication. For more info on Storm internal messaging, see:
https://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
Benchmark Results
We wrote a simple benchmark to test the performance of HDInsight Storm with EventHubs. You can find the source code here:
https://github.com/hdinsight/hdinsight-storm-examples/tree/master/EventCountExample
Our topology is a simple 3 layer topology that counts the number of messages received from EventHubs.
EventHubSpout --(localOrShuffleGrouping)-> PartialCountBolt --(GlobalGrouping)-> GlobalCountBolt
We use an EventHubs entity of 32 partitions, scaled to 20 TU. Message size is 100 bytes. We first pre-populate data in EventHubs using an event generator Storm topology, then stop that topology and start the EventCount topology. Note that using a Storm topology to generate events in high speed is important: if you pre-load data using a single threaded program, you are not going to achieve high throughput in EventCount topology. This is mainly due to the way how EventHubs arrange and store data as they are sent to EventHubs in different speed. Based on our previous discussion, the theoretical limit of throughput is 400K message/s. And once the egress message/s limit is enforced in EventHubs, this will become 40K message/s.
For the EventCount topology, we set the task number of the spout equal to the number of partitions, the number of workers in the topology equals the number of partitions. These are the results:
1) For an 8 node HDInsight Storm cluster, we can achieve 200K message/s. In this case each node is responsible for processing 4 partitions. We observed that the CPU utilization on worker node of Storm cluster is at 100% therefore the Storm cluster is the bottleneck.
2) For a 32 node cluster, we can achieve about 350K message/s, pretty close to the theoretical limit. The CPU utilization is at about 70%.
We also tested with a privately created 128 partition EventHubs with 100 TU. In this test we achieved 1M message/s with 100 node (default node size) HDInsight Storm cluster.