Триггер Apache Kafka для Функций Azure

Триггер Apache Kafka можно использовать в Функциях Azure для запуска кода функции в ответ на сообщения в разделах Kafka. Вы также можете использовать выходную привязку Kafka для записи из функции в раздел. Сведения о настройке и конфигурации см. в статье Обзор привязок Apache Kafka для Функций Azure.

Внимание

Привязки Kafka доступны только для Функций в составе эластичного плана "Премиум" и плана "Выделенный (Служба приложений)". Они поддерживаются только в среде выполнения Функций версии 3.x и выше.

Пример

Использование триггера зависит от модальности C#, используемой в приложении-функции. Это может быть один из следующих вариантов:

Изолированная библиотека классов рабочих процессов, скомпилированная функция C# выполняется в процессе, изолированном от среды выполнения.

Используемые атрибуты зависят от конкретного поставщика событий.

В следующем примере показана функция C#, которая считывает и регистрирует сообщение Kafka как событие Kafka.

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Чтобы получить события в пакете, используйте в качестве входных данных массив строк, как показано в следующем примере:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Следующая функция регистрирует сообщение и заголовки для события Kafka.

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Полный набор рабочих примеров .NET см. в репозитории расширений Kafka.

Примечание.

Эквивалентный набор примеров TypeScript см. в репозитории расширений Kafka

Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показан триггер Kafka для функции, которая считывает и регистрирует сообщение Kafka.

Следующий файл function.json определяет триггер для конкретного поставщика.

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Затем при активации функции выполняется следующий код:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Чтобы получить события в пакете, задайте для параметра cardinality в файле function.json значение many, как показано в приведенных ниже примерах.

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Затем следующий код анализирует массив событий и регистрирует данные события:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Следующий код также регистрирует данные заголовка:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующий файл function.json определяет триггер для конкретного поставщика, используя универсальную схему Avro.

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Затем при активации функции выполняется следующий код:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Полный набор рабочих примеров JavaScript см. в репозитории расширений Kafka.

Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показан триггер Kafka для функции, которая считывает и регистрирует сообщение Kafka.

Следующий файл function.json определяет триггер для конкретного поставщика.

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Затем при активации функции выполняется следующий код:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Чтобы получить события в пакете, задайте для параметра cardinality в файле function.json значение many, как показано в приведенных ниже примерах.

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Затем следующий код анализирует массив событий и регистрирует данные события:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Следующий код также регистрирует данные заголовка:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующий файл function.json определяет триггер для конкретного поставщика, используя универсальную схему Avro.

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Затем при активации функции выполняется следующий код:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Полный набор рабочих примеров PowerShell см. в репозитории расширений Kafka.

Конкретные свойства файла function.json зависят от поставщика событий. В данных примерах это Confluent или Центры событий Azure. В следующих примерах показан триггер Kafka для функции, которая считывает и регистрирует сообщение Kafka.

Следующий файл function.json определяет триггер для конкретного поставщика.

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Затем при активации функции выполняется следующий код:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Чтобы получить события в пакете, задайте для параметра cardinality в файле function.json значение many, как показано в приведенных ниже примерах.

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Затем следующий код анализирует массив событий и регистрирует данные события:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Следующий код также регистрирует данные заголовка:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующий файл function.json определяет триггер для конкретного поставщика, используя универсальную схему Avro.

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Затем при активации функции выполняется следующий код:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Полный набор рабочих примеров Python см. в репозитории расширений Kafka.

Заметки, используемые для настройки триггера, зависят от конкретного поставщика событий.

В следующем примере показана функция Java, которая считывает и регистрирует содержимое события Kafka.

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Чтобы получить события в пакете, используйте входную строку в качестве массива, как показано в следующем примере:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Следующая функция регистрирует сообщение и заголовки для события Kafka.

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Вы можете определить универсальную схему Avro для события, переданного триггеру. Следующая функция определяет триггер для конкретного поставщика, используя универсальную схему Avro.

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Полный набор рабочих примеров Java для Confluent см. в репозитории расширений Kafka.

Атрибуты

Библиотеки C# в процессе и изолированном рабочем процессе используются KafkaTriggerAttribute для определения триггера функции.

В следующей таблице описаны свойства, которые можно задать с помощью этого атрибута триггера:

Параметр Описание
BrokerList (Обязательно) Список брокеров Kafka, отслеживаемых триггером. Дополнительные сведения см. в разделе Подключения.
Раздел (Обязательно) Раздел, отслеживаемый триггером.
ConsumerGroup (Необязательно) Группа потребителей Kafka, используемая триггером.
AvroSchema (Необязательно) Схема универсальной записи при использовании протокола Avro.
AuthenticationMode (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi, Plain (по умолчанию), ScramSha256, ScramSha512.
Username (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
Пароль (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
Протокол (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера.
SslCertificateLocation (Необязательно) Путь к сертификату клиента.
SslKeyLocation (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности.
SslKeyPassword (Необязательно) Пароль для сертификата клиента.

Заметки

Заметка KafkaTrigger позволяет создать функцию, которая выполняется при получении раздела. Поддерживаемые варианты включают следующие элементы:

Элемент Description
name (Обязательно) Имя переменной, представляющей сообщение очереди или раздела в коде функции.
brokerList (Обязательно) Список брокеров Kafka, отслеживаемых триггером. Дополнительные сведения см. в разделе Подключения.
topic (Обязательно) Раздел, отслеживаемый триггером.
кратность (Необязательно) Указывает кратность входных данных триггера. Поддерживаемые значения: ONE (по умолчанию) и MANY. Используйте ONE, если входные данные являются одним сообщением, и MANY, если входные данные являются массивом сообщений. При использовании MANY необходимо также задать параметр dataType.
dataType Определяет, как Функции обрабатывают значение параметра. По умолчанию полученное значение представляет собой строку, и Функции пытаются десериализовать ее до объекта POJO. При string входные данные обрабатываются просто как строка. При binary сообщение приходит в формате двоичных данных и Функции пытаются десериализовать его до фактического типа параметра byte[].
consumerGroup (Необязательно) Группа потребителей Kafka, используемая триггером.
avroSchema (Необязательно) Схема универсальной записи при использовании протокола Avro.
authenticationMode (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi, Plain (по умолчанию), ScramSha256, ScramSha512.
username (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
пароль (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
protocol (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера.
sslCertificateLocation (Необязательно) Путь к сертификату клиента.
sslKeyLocation (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности.
sslKeyPassword (Необязательно) Пароль для сертификата клиента.

Настройка

В следующей таблице описываются свойства конфигурации привязки, которые задаются в файле function.json.

Свойство в function.json Описание
type Обязательное. Необходимо задать значение kafkaTrigger.
direction Обязательное. Необходимо задать значение in.
name (Обязательно) Имя переменной, представляющей данные, полученные через брокер, в коде функции.
brokerList (Обязательно) Список брокеров Kafka, отслеживаемых триггером. Дополнительные сведения см. в разделе Подключения.
topic (Обязательно) Раздел, отслеживаемый триггером.
кратность (Необязательно) Указывает кратность входных данных триггера. Поддерживаемые значения: ONE (по умолчанию) и MANY. Используйте ONE, если входные данные являются одним сообщением, и MANY, если входные данные являются массивом сообщений. При использовании MANY необходимо также задать параметр dataType.
dataType Определяет, как Функции обрабатывают значение параметра. По умолчанию полученное значение представляет собой строку, и Функции пытаются десериализовать ее до объекта POJO. При string входные данные обрабатываются просто как строка. При binary сообщение приходит в формате двоичных данных и Функции пытаются десериализовать его до фактического типа параметра byte[].
consumerGroup (Необязательно) Группа потребителей Kafka, используемая триггером.
avroSchema (Необязательно) Схема универсальной записи при использовании протокола Avro.
authenticationMode (Необязательно) Режим, используемый при простой проверке подлинности и проверке подлинности уровня безопасности (SASL). Поддерживаемые значения: Gssapi, Plain (по умолчанию), ScramSha256, ScramSha512.
username (Необязательно) Имя пользователя для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
пароль (Необязательно) Пароль для проверки подлинности SASL. Не поддерживается, если AuthenticationMode имеет значение Gssapi. Дополнительные сведения см. в разделе Подключения.
protocol (Необязательно) Протокол безопасности, используемый при взаимодействии с брокерами. Поддерживаемые значения: plaintext (по умолчанию), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Необязательно) Путь к файлу сертификата ЦС для проверки сертификата брокера.
sslCertificateLocation (Необязательно) Путь к сертификату клиента.
sslKeyLocation (Необязательно) Путь к закрытому ключу клиента (PEM), используемому для проверки подлинности.
sslKeyPassword (Необязательно) Пароль для сертификата клиента.

Использование

События Kafka в настоящее время поддерживаются как строки и массивы строк, которые являются полезными данными JSON.

Сообщения Kafka передаются функции в виде строк и массивов строк, которые являются полезными данными JSON.

В плане "Премиум" необходимо включить мониторинг масштабирования среды выполнения для выходных данных Kafka, чтобы иметь возможность горизонтально увеличить масштаб до нескольких экземпляров. Дополнительные сведения см. в статье Включение масштабирования среды выполнения.

Для работы с триггерами Kafka нельзя использовать функцию "Тест и запуск" страницы "Код и тест" на портале Azure. Вместо этого необходимо отправлять тестовые события непосредственно в раздел, отслеживаемый триггером.

Полный набор поддерживаемых параметров host.json для триггера Kafka см. в статье Параметры host.json.

Связи

Все сведения о подключении, необходимые триггерам и привязкам, должны храниться в параметрах приложения, а не в определениях привязок в коде. Это верно для учетных данных, которые никогда не должны храниться в коде.

Внимание

Параметры учетных данных должны ссылаться на параметр приложения. Не прописывайте учетные данные в файлах кода или конфигурации. При локальном запуске используйте файл local.settings.json для учетных данных и не публикуйте его.

При подключении к управляемому кластеру Kafka, предоставляемому Confluent в Azure, убедитесь, что в триггере или привязке заданы следующие учетные данные проверки подлинности для среды Confluent Cloud:

Параметр Рекомендуемое значение Description
BrokerList BootstrapServer Параметр приложения с именем BootstrapServer содержит значение сервера начальной загрузки, найденное на странице параметров Confluent Cloud. Значение напоминает xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Username ConfluentCloudUsername Параметр приложения с именем ConfluentCloudUsername содержит ключ доступа API, полученный от веб-сайта Confluent Cloud.
Пароль ConfluentCloudPassword Параметр приложения с именем ConfluentCloudPassword содержит секрет API, полученный от веб-сайта Confluent Cloud.

Строковые значения, используемые для этих параметров, должны присутствовать в качестве параметров приложения в Azure или коллекции Values в файле local.settings.json во время локальной разработки.

Необходимо также задать Protocol, AuthenticationMode и SslCaLocation в определениях привязок.

Следующие шаги