Unable to poll more than 5 consumer records at once from Azure EventHub using KafkaConsumer

Mahendra Sawarkar 40 Reputation points
2024-01-30T13:53:03.09+00:00
Below are my Kafka Consumer properties I am using while consuming the EventHub:

kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put("security.protocol", "SASL_SSL");
kafkaProps.put("sasl.mechanism", "PLAIN");
kafkaProps.put("sasl.jaas.config", saslJaasConfig);
kafkaProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 104857600);
kafkaProps.put("poll.timeout.ms", 10000);

kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100000);
kafkaProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 180_000);
kafkaProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 180_000);
kafkaProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90_000);

kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);

kafkaProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10485760);
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
kafkaProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 20000000);

With this config we were able to poll the EventHub records successfully but with very low speed, with the Standard Tier EventHub, we could poll max 5 records per poll

below is Java snippet we are using:

try (KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaProperties)) {
    kafkaConsumer.subscribe(Collections.singleton(eventhubName));
    while (!stopThread) {
        final ConsumerRecords consumerRecords = kafkaConsumer.poll(KAFKA_POLL_DURATION);
        for (ConsumerRecord record : consumerRecords) {
                String valAsString = record.value();
                System.out.println("The value is " + valAsString);
        }
    }
}

with this we could only poll max 5 records, that is very low, the sum of bytes for all consumer records is less than 2MB which is very low

we would like to consume more than 50MB per poll which will help us optimise the cost at our downstream application, if you check below properties settings

kafkaProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10485760);
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
kafkaProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 20000000);

we should be able to consume atleast 10MB per poll, but it is not happenning as expected, could you please suggest / guide further

References:
- https://video2.skills-academy.com/en-us/azure/event-hubs/apache-kafka-configurations#consumer-configurations-only-1
- https://video2.skills-academy.com/en-us/azure/event-hubs/apache-kafka-frequently-asked-questions?source=recommendations#does-azure-event-hubs-run-on-apache-kafka-
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
591 questions
{count} votes