Vinculação de saída do Apache Kafka para o Azure Functions

A associação de saída permite que um aplicativo do Azure Functions escreva mensagens em um tópico do Kafka.

Importante

As ligações Kafka só estão disponíveis para Funções no plano Elastic Premium e no plano Dedicado (Serviço de Aplicativo). Eles só são suportados na versão 3.x e versão posterior do tempo de execução do Functions.

Exemplo

O uso da associação depende da modalidade C# usada em seu aplicativo de função, que pode ser uma das seguintes:

Uma biblioteca de classes de processo de trabalho isolada compilada função C# é executada em um processo isolado do tempo de execução.

Os atributos que você usa dependem do provedor de eventos específico.

O exemplo a seguir tem um tipo de retorno personalizado que é MultipleOutputType, que consiste em uma resposta HTTP e uma saída Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Na classe MultipleOutputType, Kevent é a variável de ligação de saída para a ligação de Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Para enviar um lote de eventos, passe uma matriz de cadeia de caracteres para o tipo de saída, conforme mostrado no exemplo a seguir:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

A matriz de cadeia de caracteres é definida como Kevents propriedade na classe, na qual a ligação de saída é definida:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

A função a seguir adiciona cabeçalhos aos dados de saída de Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Para obter um conjunto completo de exemplos .NET de trabalho, consulte o repositório de extensão Kafka.

Nota

Para obter um conjunto equivalente de exemplos de TypeScript, consulte o repositório de extensão Kafka

As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma ligação de saída Kafka para uma função que é acionada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.

A function.json a seguir define o gatilho para o provedor específico nesses exemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

O código a seguir envia uma mensagem para o tópico:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Para obter um conjunto completo de exemplos JavaScript de trabalho, consulte o repositório de extensões Kafka.

As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma ligação de saída Kafka para uma função que é acionada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.

A function.json a seguir define o gatilho para o provedor específico nesses exemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

O código a seguir envia uma mensagem para o tópico:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Para obter um conjunto completo de exemplos de PowerShell em funcionamento, consulte o repositório de extensão Kafka.

As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos a seguir mostram uma ligação de saída Kafka para uma função que é acionada por uma solicitação HTTP e envia dados da solicitação para o tópico Kafka.

A function.json a seguir define o gatilho para o provedor específico nesses exemplos:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

O código a seguir envia uma mensagem para o tópico:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Para obter um conjunto completo de exemplos Python de trabalho, consulte o repositório de extensão Kafka.

As anotações que você usa para configurar a associação de saída dependem do provedor de eventos específico.

A função a seguir envia uma mensagem para o tópico Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

O exemplo a seguir mostra como enviar várias mensagens para um tópico Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Neste exemplo, o parâmetro de vinculação de saída é alterado para matriz de cadeia de caracteres.

O último exemplo usa para estas KafkaEntity e KafkaHeader classes:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

A função de exemplo a seguir envia uma mensagem com cabeçalhos para um tópico Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Para obter um conjunto completo de exemplos Java de trabalho para Confluent, consulte o repositório de extensão Kafka.

Atributos

As bibliotecas C# do processo de trabalho em processo e isoladas usam o Kafka atributo para definir o gatilho de função.

A tabela a seguir explica as propriedades que você pode definir usando esse atributo:

Parâmetro Description
Lista de Corretores (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
Tópico (Obrigatório) O tópico para o qual a saída é enviada.
AvroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro.
MaxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
Tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
CapacitaçãoIdempotência (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
MessageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
RequestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
MaxRetries (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true.
Modo de autenticação (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são Gssapi, Plain (padrão), ScramSha256, ScramSha512.
Nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
Palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
Protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são plaintext (padrão), ssl, , sasl_plaintextsasl_ssl.
Localização de SslCa. (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
SslCertificateLocation (Opcional) Caminho para o certificado do cliente.
SslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
SslKeyPassword (Opcional) Senha para o certificado do cliente.

Anotações

A KafkaOutput anotação permite que você crie uma função que grava em um tópico específico. As opções suportadas incluem os seguintes elementos:

Elemento Description
Designação O nome da variável que representa os dados intermediados no código da função.
Lista de corretores (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
Tipo de dados Define como Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma string e Functions tenta desserializar a string para o objeto Java plain-old real (POJO). Quando string, a entrada é tratada como apenas uma cadeia de caracteres. Quando binary, a mensagem é recebida como dados binários e o Functions tenta desserializá-la para um byte do tipo parâmetro real[].
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. (Atualmente não suportado para Java.)
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Este é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxTentativas (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true.
authenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são Gssapi, Plain (padrão), ScramSha256, ScramSha512.
nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são plaintext (padrão), ssl, , sasl_plaintextsasl_ssl.
sslCaLocalização (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha para o certificado do cliente.

Configuração

A tabela a seguir explica as propriedades de configuração de associação definidas no arquivo function.json .

function.json propriedade Description
type Deve ser definido como kafka.
direção Deve ser definido como out.
Designação O nome da variável que representa os dados intermediados no código da função.
Lista de corretores (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro.
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Este é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxTentativas (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true.
authenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são Gssapi, Plain (padrão), ScramSha256, ScramSha512.
nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são plaintext (padrão), ssl, , sasl_plaintextsasl_ssl.
sslCaLocalização (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha para o certificado do cliente.

Utilização

Ambos os tipos de chaves e valores são suportados com a serialização Avro e Protobuf integrada.

O deslocamento, a partição e o carimbo de data/hora do evento são gerados em tempo de execução. Apenas o valor e os cabeçalhos podem ser definidos dentro da função. O tópico é definido no function.json.

Por favor, certifique-se de ter acesso ao tópico Kafka para o qual você está tentando escrever. Você configura a associação com credenciais de acesso e conexão para o tópico Kafka.

Em um plano Premium, você deve habilitar o monitoramento de escala de tempo de execução para que a saída Kafka possa ser dimensionada para várias instâncias. Para saber mais, consulte Habilitar o dimensionamento em tempo de execução.

Para obter um conjunto completo de configurações de host.json suportadas para o gatilho Kafka, consulte host.json configurações.

Ligações

Todas as informações de conexão exigidas por seus gatilhos e ligações devem ser mantidas nas configurações do aplicativo e não nas definições de vinculação em seu código. Isso é verdade para credenciais, que nunca devem ser armazenadas em seu código.

Importante

As configurações de credenciais devem fazer referência a uma configuração de aplicativo. Não codifice credenciais em seu código ou arquivos de configuração. Ao executar localmente, use o arquivo local.settings.json para suas credenciais e não publique o arquivo local.settings.json.

Ao conectar-se a um cluster Kafka gerenciado fornecido pelo Confluent no Azure, certifique-se de que as seguintes credenciais de autenticação para seu ambiente Confluent Cloud estejam definidas em seu gatilho ou associação:

Definição Valor recomendado Description
Lista de Corretores BootstrapServer A configuração do aplicativo nomeada BootstrapServer contém o valor do servidor de inicialização encontrado na página Configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nome de utilizador ConfluentCloudUsername A configuração do aplicativo nomeada ConfluentCloudUsername contém a chave de acesso da API do site Confluent Cloud.
Palavra-passe ConfluentCloudPassword A configuração do aplicativo nomeada ConfluentCloudPassword contém o segredo da API obtido do site Confluent Cloud.

Os valores de cadeia de caracteres que você usa para essas configurações devem estar presentes como configurações de Values aplicativo no Azure ou na coleção no arquivo local.settings.json durante o desenvolvimento local.

Você também deve definir o Protocol, AuthenticationModee SslCaLocation em suas definições de vinculação.

Próximos passos