Information on role of consumer-group in EventHub

Mahendra Sawarkar 40 Reputation points
2024-01-05T07:04:17.7066667+00:00

Hi

I am using standard Azure Library to consume the EventHub.

In our case, a single Java application will be consuming the multiple EventHubs at the same time. That means all instances of this application will be consuming all EventHubs mentioned. EventHub count is more than 2K. And I think it will go beyond than that to cater more customers.(one customer is one EventHub)

We kept the horizontal scaling based on the CPU and Memory utilisation for this application.

I am using below snippet to consume the data from EventHub

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.connectionString("Endpoint=******/;SharedAccessKeyName=policy;SharedAccessKey=key;EntityPath=eh")
.consumerGroup("consumergroup")
.initialPartitionEventPosition(earliestPosition)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
                    System.out.printf("Partition id = %s and sequence number of event = %s %n %n %s",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber(),
eventContext.getEventData().getOffset(),
eventContext.getEventData().getBodyAsString());
           })
.processError(errorContext -> {
                    System.out.printf("Error occurred in partition processor for partition %s, %s%n",errorContext.getPartitionContext().getPartitionId(),
                            errorContext.getThrowable());
          })
.buildEventProcessorClient();

eventProcessorClient.start();

Questions:

  • We are using in memory CheckpointStore, when we start multiple clients for same EventHub-ConsumerGroup, then only one client remains active per partition, i.e. out of 10 partitions 5 will be consumed by first and other 5 will be consumed by other client instance.(PartitionLoadBalancer) We want only 32 clients active per EventHub, above which if we instantiate a new client, then we want a Exception to be thrown out as the PartitionLoadBalancer has information about the other active clients I think this should be feasible. I know it can be done at our level, but can APIs from this library be used for such requirement.
  • While starting a new Client it always starts consuming from latest message by default. When I provided Event position as Earliest then a new client starts consuming from very beginning. Not sure if we manage offset at the consumer group level i.e. if a new client is starting it should check the offset at the consumer group and then starts from that point. We are seeing lot of duplicate alerts as when application scales up the new clients start polling from the earliest point. As EventHub is like Kafka there must be offset committed at the consumer group level, I am not sure why as a library end user should mange the offset. Please suggest the solution on this
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
591 questions
{count} votes

Accepted answer
  1. PRADEEPCHEEKATLA-MSFT 84,456 Reputation points Microsoft Employee
    2024-01-09T04:28:30.3433333+00:00

    @Mahendra Sawarkar - I apologize for the confusion. Here are the answers to your questions using the Java API:

    To limit the number of active clients per EventHub, you can set the maxBatchSize property in the EventProcessorClientBuilder. This property specifies the maximum number of partitions that a single instance of the EventProcessorClient can process at a time. If you set this property to 32, then only 32 clients will be active per EventHub. If you try to instantiate a new client beyond this limit, an exception will be thrown.

    By default, when a new client is started, it starts consuming from the latest message. However, you can change this behavior by setting the startingPosition property in the EventProcessorClientBuilder. If you set this property to EventPosition.earliest(), then the new client will start consuming from the beginning of the stream. You can also manage the offset at the consumer group level by using checkpoints. When a client processes an event, it can mark the event as processed by calling the checkpoint() method on the PartitionContext object. This will update the checkpoint for that partition in the consumer group, and the next client that starts consuming from that partition will start from the last checkpointed event.

    Duplicate alerts can occur if a new client starts consuming from the beginning of the stream. To avoid this, you can use checkpoints to manage the offset at the consumer group level. When a client starts, it can read the last checkpointed event for each partition and start consuming from there. This will ensure that there are no duplicates. The library provides a CheckpointStore interface that you can implement to store and retrieve checkpoints. You can use an external storage like Azure Blob Storage or Azure Table Storage to store the checkpoints.

    I hope this helps. Let me know if you have any further questions.


0 additional answers

Sort by: Most helpful