.NET Aspire Apache Kafka component

In this article, you learn how to use the .NET Aspire Apache Kafka client message-broker. The Aspire.Confluent.Kafka library registers an IProducer<TKey, TValue> and an IConsumer<TKey, TValue> in the dependency injection (DI) container for connecting to a Apache Kafka server. It enables corresponding health check, logging and telemetry.

Get started

To get started with the .NET Aspire Apache Kafka component, install the Aspire.Confluent.Kafka NuGet package.

dotnet add package Aspire.Confluent.Kafka

For more information, see dotnet add package or Manage package dependencies in .NET applications.

Example usage

In the Program.cs file of your component-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters will be used to new an instance of ProducerBuilder<TKey, TValue>. This method also take connection name parameter.

builder.AddKafkaProducer<string, string>("messaging");

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection. For example, to retrieve the producer from an IHostedService:

internal sealed class MyWorker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

App host usage

To model the Kafka resource in the app host, install the Aspire.Hosting.Kafka NuGet package.

dotnet add package Aspire.Hosting.Kafka

In your app host project, register a Kafka container and consume the connection using the following methods:

var builder = DistributedApplication.CreateBuilder(args);

var messaging = builder.AddKafka("messaging");

var myService = builder.AddProject<Projects.MyService>()
                       .WithReference(messaging);

The WithReference method configures a connection in the MyService project named messaging. In the Program.cs file of MyService, the Apache Kafka broker connection can be consumed using:

builder.AddKafkaProducer<string, string>("messaging");

or

builder.AddKafkaConsumer<string, string>("messaging");

Configuration

The .NET Aspire Apache Kafka component provides multiple options to configure the connection based on the requirements and conventions of your project.

Use a connection string

When using a connection string from the ConnectionStrings configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer() or builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("myConnection");

And then the connection string will be retrieved from the ConnectionStrings configuration section:

{
  "ConnectionStrings": {
    "myConnection": "broker:9092"
  }
}

The value provided as connection string will be set to the BootstrapServers property of the produced IProducer<TKey, TValue> or IConsumer<TKey, TValue> instance. Refer to BootstrapServers for more information.

Use configuration providers

The .NET Aspire Apache Kafka component supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration by respectively using the Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer keys. This example appsettings.json configures some of the options:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

The Config properties of both Aspire:Confluent:Kafka:Producer and Aspire.Confluent:Kafka:Consumer configuration sections respectively bind to instances of ProducerConfig and ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requires the ClientId property to be set to let the broker track consumed message offsets.

Use inline delegates

Configuring KafkaProducerSettings and KafkaConsumerSettings

You can pass the Action<KafkaProducerSettings> configureSettings delegate to set up some or all the options inline, for example to disable health checks from code:

builder.AddKafkaProducer<string, string>("messaging", settings => settings.DisableHealthChecks  = true);

You can configure inline a consumer from code:

builder.AddKafkaConsumer<string, string>("messaging", settings => settings.DisableHealthChecks  = true);

Configuring ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue>

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>> (or Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>("messaging", producerBuilder => {
  producerBuilder.SetValueSerializer(new MyMessageSerializer());
})

Refer to ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue> API documentation for more information.

Health checks

By default, .NET Aspire components enable health checks for all services. For more information, see .NET Aspire components overview.

The .NET Aspire Apache Kafka component handles the following:

Observability and telemetry

.NET Aspire components automatically set up Logging, Tracing, and Metrics configurations, which are sometimes known as the pillars of observability. For more information about component observability and telemetry, see .NET Aspire components overview. Depending on the backing service, some components may only support some of these features. For example, some components support logging and tracing, but not metrics. Telemetry features can also be disabled using the techniques presented in the Configuration section.

Logging

The .NET Aspire Apache Kafka component uses the following log categories:

  • Aspire.Confluent.Kafka

Tracing

The .NET Aspire Apache Kafka component will emit the following tracing activities using OpenTelemetry:

  • Aspire.Confluent.Kafka

Metrics

The .NET Aspire Apache Kafka component will emit the following metrics using OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

See also