Associação de saída do Apache Kafka para o Azure Functions

A associação de saída permite que um aplicativo Azure Functions para gravar mensagens em um tópico do Kafka.

Importante

As associações do Kafka só estão disponíveis para o Functions no plano Elastic Premium e plano Dedicado (Serviço de Aplicativo). São compatíveis apenas na versão 3.x e posterior do runtime do Functions.

Exemplo

O uso de associação depende da modalidade C# utilizada em seu aplicativo de funções, que pode ser uma das seguintes:

Uma função C# compilada da biblioteca de classes do processo de trabalho isolado é executada em um processo isolado do runtime.

Os atributos usados dependem do provedor de eventos específico.

O exemplo a seguir tem um tipo de retorno personalizado, que MultipleOutputTypeconsiste em uma resposta HTTP e uma saída do Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Na classe MultipleOutputType, Kevent está a variável de associação de saída para a associação Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Para enviar um lote de eventos, passe uma matriz de cadeia de caracteres para o tipo de saída, conforme mostrado no exemplo a seguir:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

A matriz de cadeia de caracteres é definida como propriedade Kevents na classe, na qual a associação de saída é definida:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

A função a seguir adiciona cabeçalhos aos dados de saída do Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Para obter um conjunto completo de exemplos de .NET em funcionamento, consulte o repositório de extensão do Kafka.

Observação

Para obter um conjunto equivalente de exemplos de TypeScript, consulte o repositório de extensão do Kafka

As propriedades específicas do arquivo function.json dependem do provedor de eventos, que nestes exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma associação de saída do Kafka para uma função que é disparada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.

A seguinte função.json define o gatilho para o fornecedor específico nestes exemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

O código a seguir, envia uma mensagem para o tópico:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico do Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Para obter um conjunto completo de exemplos de JavaScript em funcionamento, consulte o repositório de extensão do Kafka.

As propriedades específicas do arquivo function.json dependem do provedor de eventos, que nestes exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma associação de saída do Kafka para uma função que é disparada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.

A seguinte função.json define o gatilho para o fornecedor específico nestes exemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

O código a seguir, envia uma mensagem para o tópico:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico do Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Para obter um conjunto completo de exemplos de PowerShell em funcionamento, consulte o repositório de extensão do Kafka.

As propriedades específicas do arquivo function.json dependem do provedor de eventos, que nestes exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma associação de saída do Kafka para uma função que é disparada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.

A seguinte função.json define o gatilho para o fornecedor específico nestes exemplos:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

O código a seguir, envia uma mensagem para o tópico:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico do Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Para obter um conjunto completo de exemplos de Python em funcionamento, consulte o repositório de extensão do Kafka.

As anotações que você usa para configurar a associação de saída dependem do provedor de eventos específico.

A função a seguir envia uma mensagem para o tópico do Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

O exemplo a seguir mostra como enviar várias mensagens para um tópico do Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Neste exemplo, o parâmetro de associação de saída é alterado para matriz de cadeia de caracteres.

O último exemplo é usado para estas classes KafkaEntity e KafkaHeader:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

A função de exemplo a seguir envia uma mensagem com cabeçalhos para um tópico do Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Para obter um conjunto completo de exemplos de Java para Confluent em funcionamento, consulte o repositório de extensão do Kafka.

Atributos

As bibliotecas C# em processo e de processo de trabalho isolado usam o atributo Kafka para definir o gatilho da função.

A tabela a seguir explica as propriedades que você pode definir usando este atributo:

Parâmetro Descrição
BrokerList (Obrigatório) A lista de agentes Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
Tópico (Obrigatório) O tópico para o qual a saída é enviada.
AvroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro.
MaxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
BatchSize (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
EnableIdempotence (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
MessageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo em que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo novas tentativas). O erro de entrega ocorre quando a contagem de repetições ou o tempo limite da mensagem são excedidos.
RequestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
MaxRetries (Opcional) O número de vezes para tentar enviar novamente uma mensagem com falha, com um padrão de 2. A repetição pode causar reordenação, a menos que EnableIdempotence esteja definida como true.
AuthenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores compatíveis são Gssapi, Plain (padrão), ScramSha256, ScramSha512.
Nome de usuário (Opcional) O nome de usuário para autenticação SASL. Não há suporte quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
Senha (Opcional) A senha para autenticação SASL. Não há suporte quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
Protocolo (Opcional) O protocolo de segurança usado ao se comunicar com agentes. Os valores compatíveis são plaintext (padrão), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Opcional) Caminho para o arquivo de certificado da AC para verificar o certificado do agente.
SslCertificateLocation (Opcional) Caminho para o certificado do cliente.
SslKeyLocation (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
SslKeyPassword (Opcional) Senha do certificado do cliente.

Anotações

A anotação KafkaOutput permite que você crie uma função que grava em um tópico específico. As opções compatíveis incluem os seguintes elementos:

Elemento Descrição
name (Obrigatório) O nome da variável que representa os dados agenciados no código de função.
brokerList (Obrigatório) A lista de agentes Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
dataType Define como o Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma cadeia de caracteres, e o Functions tenta desserializá-la para o POJO (objeto Java). Quando string, a entrada é tratada apenas como uma cadeia de caracteres. Quando binary, a mensagem é recebida como dados binários, e o Functions tenta desserializá-los para um byte de tipo de parâmetro real[].
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. (Atualmente não suportado para Java.)
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
batchSize (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo em que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo 0 é infinito. Esse é o tempo máximo usado para entregar uma mensagem (incluindo novas tentativas). O erro de entrega ocorre quando a contagem de repetições ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxRetries (Opcional) O número de vezes para tentar enviar novamente uma mensagem com falha, com um padrão de 2. A repetição pode causar reordenação, a menos que EnableIdempotence esteja definida como true.
authenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores compatíveis são Gssapi, Plain (padrão), ScramSha256, ScramSha512.
username (Opcional) O nome de usuário para autenticação SASL. Não há suporte quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
password (Opcional) A senha para autenticação SASL. Não há suporte quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
protocol (Opcional) O protocolo de segurança usado ao se comunicar com agentes. Os valores compatíveis são plaintext (padrão), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opcional) Caminho para o arquivo de certificado da AC para verificar o certificado do agente.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocation (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha do certificado do cliente.

Configuração

A tabela a seguir explica as propriedades de configuração de associação que você define no arquivo function.json.

Propriedade function.json Descrição
tipo Deve ser definido como kafka.
direction Deve ser definido como out.
name (Obrigatório) O nome da variável que representa os dados agenciados no código de função.
brokerList (Obrigatório) A lista de agentes Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro.
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
batchSize (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo em que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo 0 é infinito. Esse é o tempo máximo usado para entregar uma mensagem (incluindo novas tentativas). O erro de entrega ocorre quando a contagem de repetições ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxRetries (Opcional) O número de vezes para tentar enviar novamente uma mensagem com falha, com um padrão de 2. A repetição pode causar reordenação, a menos que EnableIdempotence esteja definida como true.
authenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores compatíveis são Gssapi, Plain (padrão), ScramSha256, ScramSha512.
username (Opcional) O nome de usuário para autenticação SASL. Não há suporte quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
password (Opcional) A senha para autenticação SASL. Não há suporte quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
protocol (Opcional) O protocolo de segurança usado ao se comunicar com agentes. Os valores compatíveis são plaintext (padrão), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Opcional) Caminho para o arquivo de certificado da AC para verificar o certificado do agente.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocation (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha do certificado do cliente.

Uso

Os tipos de chaves e valores têm suporte com serialização interna do Avro e do Protobuf.

O deslocamento, a partição e o carimbo de data/hora do evento são gerados em runtime. Somente o valor e os cabeçalhos podem ser definidos dentro da função. O tópico é definido no function.json.

Certifique-se de ter acesso ao tópico Kafka ao qual você está tentando escrever. Configure a associação com credenciais de acesso e conexão para o tópico Kafka.

Em um plano Premium, você deve habilitar o monitoramento de escala de runtime para que a saída do Kafka possa ser dimensionada para várias instâncias. Para saber mais, confira Habilitar o dimensionamento de runtime.

Para obter um conjunto completo de configurações de host.json compatíveis para o gatilho do Kafka, consulte as configurações de host.json.

conexões

Todas as informações obrigatórias exigidas por seus gatilhos e associações devem ser mantidas nas configurações do aplicativo, não nas definições obrigatórias em seu código. Isso é verdadeiro para credenciais, que nunca devem ser armazenadas em seu código.

Importante

As configurações de credencial devem fazer referência a uma configuração de aplicativo. Não embutir credenciais em seu código ou nos arquivos de configuração. Ao executar localmente, use o arquivo local.settings.json para suas credenciais e não publique o arquivo local.settings.json.

Ao se conectar a um cluster do Kafka gerenciado fornecido pelo Confluent no Azure, verifique se as seguintes credenciais de autenticação para seu ambiente do Confluent Cloud estão definidas no gatilho ou na associação:

Setting Valor recomendado Descrição
BrokerList BootstrapServer A configuração de aplicativo nomeada BootstrapServer contém o valor do servidor bootstrap encontrado na página de configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nome de usuário ConfluentCloudUsername A configuração de aplicativo nomeada ConfluentCloudUsername contém a chave de acesso da API no site do Confluent Cloud.
Senha ConfluentCloudPassword A configuração de aplicativo nomeada ConfluentCloudPassword contém o segredo da API obtido no site do Confluent Cloud.

Os valores de cadeia de caracteres usados para essas configurações devem estar presentes como configurações de aplicativo no Azure ou na coleção Values no arquivo local.settings.json durante o desenvolvimento local.

Você também deve determinar o Protocol, AuthenticationMode e SslCaLocation em suas definições de associação.

Próximas etapas