Azure Event Hubs
An Azure real-time data ingestion service.
591 questions
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Below are my Kafka Consumer properties I am using while consuming the EventHub:
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put("security.protocol", "SASL_SSL");
kafkaProps.put("sasl.mechanism", "PLAIN");
kafkaProps.put("sasl.jaas.config", saslJaasConfig);
kafkaProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 104857600);
kafkaProps.put("poll.timeout.ms", 10000);
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100000);
kafkaProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 180_000);
kafkaProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 180_000);
kafkaProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90_000);
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
kafkaProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10485760);
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
kafkaProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 20000000);
With this config we were able to poll the EventHub records successfully but with very low speed, with the Standard Tier EventHub, we could poll max 5 records per poll
below is Java snippet we are using:
try (KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaProperties)) {
kafkaConsumer.subscribe(Collections.singleton(eventhubName));
while (!stopThread) {
final ConsumerRecords consumerRecords = kafkaConsumer.poll(KAFKA_POLL_DURATION);
for (ConsumerRecord record : consumerRecords) {
String valAsString = record.value();
System.out.println("The value is " + valAsString);
}
}
}
with this we could only poll max 5 records, that is very low, the sum of bytes for all consumer records is less than 2MB which is very low
we would like to consume more than 50MB per poll which will help us optimise the cost at our downstream application, if you check below properties settings
kafkaProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 10485760);
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
kafkaProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 20000000);
we should be able to consume atleast 10MB per poll, but it is not happenning as expected, could you please suggest / guide further
References:
- https://video2.skills-academy.com/en-us/azure/event-hubs/apache-kafka-configurations#consumer-configurations-only-1
- https://video2.skills-academy.com/en-us/azure/event-hubs/apache-kafka-frequently-asked-questions?source=recommendations#does-azure-event-hubs-run-on-apache-kafka-