Получение данных из Apache Kafka в Azure Data Explorer
Apache Kafka — это распределенная платформа потоковой передачи для создания конвейеров потоковой передачи данных в режиме реального времени, которые надежно перемещают данные между системами или приложениями. Kafka Connect — это инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Kusto Kafka Sink служит соединителем из Kafka и не требует использования кода. Скачайте JAR-файл соединителя приемника из репозитория Git или концентратора соединителя Confluent.
В этой статье показано, как принимать данные с помощью Kafka с помощью автономной установки Docker, чтобы упростить настройку кластера Kafka и кластера соединителя Kafka.
Дополнительные сведения см. в разделе Репозиторий Git и Сведения о версиях для соединителя.
Предварительные требования
- Подписка Azure. Создайте бесплатную учетную запись Azure.
- Azure Data Explorer кластер и базу данных с кэшем и политиками хранения по умолчанию илибазу данных KQL в Microsoft Fabric.
- Azure CLI.
- Docker и Docker Compose.
Создание субъекта-службы Microsoft Entra
Субъект-службу Microsoft Entra можно создать с помощью портал Azure или программно, как показано в следующем примере.
Этот субъект-служба будет идентификатором, используемым соединителем для записи данных таблицы в Kusto. Позже вы предоставите этому субъекту-службе разрешения на доступ к ресурсам Kusto.
Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.
az login
Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Создайте субъект-службу. В этом примере принципал службы называется
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Из возвращенных данных JSON скопируйте
appId
,password
иtenant
для использования в будущем.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Вы создали приложение Microsoft Entra и субъект-службу.
Создание целевой таблицы
В среде запросов создайте таблицу с именем
Storms
с помощью следующей команды:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Создайте соответствующее сопоставление таблицы
Storms_CSV_Mapping
для загруженных данных с помощью следующей команды:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Создайте политику пакетной обработки приема в таблице для настраиваемой задержки приема в очереди.
Совет
Политика пакетной обработки приема — это оптимизатор производительности, включающий три параметра. Первое удовлетворенное условие запускает прием данных в таблицу Azure Data Explorer.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Используйте субъект-службу из раздела Создание Microsoft Entra субъекта-службы, чтобы предоставить разрешение на работу с базой данных.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Запуск лаборатории
Следующая лабораторная работа предназначена для того, чтобы дать вам опыт создания данных, настройки соединителя Kafka и потоковой передачи этих данных в Azure Data Explorer с помощью соединителя. Затем вы можете просмотреть полученные данные.
Клонировать репозиторий git
Клонировать репозиторий git лаборатории.
Создайте локальный каталог на вашем компьютере.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Клонирование репозитория.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Содержание клонированного репозитория
Выполните следующую команду, чтобы вывести список содержимого клонированного репозитория:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Вот результат этого поиска:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Просмотрите файлы в клонированном репозитория
В следующих разделах объясняются важные части файлов в дереве файлов выше.
adx-sink-config.json
Этот файл содержит файл свойств приемника Kusto, в котором вы обновляете определенные детали конфигурации:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Замените значения следующих атрибутов в соответствии с настройкой Azure Data Explorer: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(имя базы данных) kusto.ingestion.url
и kusto.query.url
.
Соединитель — Dockerfile
В этом файле есть команды для создания образа докера для экземпляра соединителя. Он включает загрузку соединителя из каталога выпуска репозитория git.
Каталог Storm-events-producer
В этом каталоге находится программа Go, которая считывает локальный файл StormEvents.csv и публикует данные в теме Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Запуск контейнеров
В терминале запустите контейнеры:
docker-compose up
Приложение-производитель начнет отправлять события в тему
storm-events
. Вы должны увидеть журналы, похожие на следующие журналы:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Чтобы проверить журналы, выполните следующую команду в отдельном терминале:
docker-compose logs -f | grep kusto-connect
Запуск соединителя
Используйте REST-вызов Kafka Connect для запуска соединителя.
В отдельном терминале запустите задачу приемника с помощью следующей команды:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Чтобы проверить статус, запустите следующую команду в отдельном терминале:
curl http://localhost:8083/connectors/storm/status
Соединитель начнет ставить в очередь процессы приема в Azure Data Explorer.
Примечание
Если у вас возникли проблемы с подключением к соединителю создайте вопрос.
Запросить и просмотреть данные
Подтвердите получение данных
Подождите, пока данные поступят в таблицу
Storms
. Чтобы подтвердить передачу данных, проверьте количество строк:Storms | count
Убедитесь, что в процессе приема нет сбоев:
.show ingestion failures
Как только вы увидите данные, попробуйте выполнить несколько запросов.
Запрос данных
Чтобы увидеть все записи, выполните следующий запрос:
Storms
Используйте
where
иproject
для фильтрации определенных данных:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Используйте оператор
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Дополнительные примеры запросов и рекомендации см. в статье Написание запросов в KQL и язык запросов Kusto документации.
Reset
Для сброса выполните следующие действия.
- Остановка контейнеров (
docker-compose down -v
) - Удалить (
drop table Storms
) - Повторное создание таблицы
Storms
- Повторное создание сопоставления таблиц
- Перезапуск контейнеров (
docker-compose up
)
Очистка ресурсов
Чтобы удалить ресурсы Azure Data Explorer, используйте az cluster delete или az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Настройка соединителя Kafka Sink
Настройте соединитель Kafka Sink для работы с политикой пакетной обработки приема данных:
- Настройте предельный размер
flush.size.bytes
для Kafka Sink начиная с 1 МБ с шагом приращения 10 МБ или 100 МБ. - При использовании Kafka Sink данные агрегируются дважды. Данные на стороне соединителя агрегируются в соответствии с параметрами сброса, а на стороне службы Azure Data Explorer — в соответствии с политикой пакетной обработки. Если время пакетной обработки слишком мало и данные не могут быть приняты ни соединителем, ни службой, необходимо увеличить время пакетной обработки. Установите размер пакетной обработки в 1 ГБ. При необходимости его можно увеличить или уменьшить с шагом 100 МБ. Например, если размер очистки равен 1 МБ, а размер политики пакетной обработки — 100 МБ, после агрегирования пакета размером 100 МБ соединителем приемника Kafka пакет размером 100 МБ будет приниматься службой Azure Data Explorer. Если время политики пакетной обработки составляет 20 секунд, а соединитель Kafka Sink сбрасывает 50 МБ за 20-секундный период, служба примет пакет размером 50 МБ.
- Вы можете изменять масштаб, добавляя экземпляры и разделы Kafka. Увеличьте значение
tasks.max
до нужного числа разделов. Создайте раздел, если у вас достаточно данных для создания большого двоичного объекта, размер которого равен значению параметраflush.size.bytes
. Если большой двоичный объект меньше, пакет обрабатывается по достижении предельного времени, поэтому секция не получит достаточной пропускной способности. Большое количество разделов приводит к увеличению времени на обработку.
См. также
- Узнайте больше об Архитектуре больших данных.
- Узнайте, как загрузить образцы данных в формате JSON в Azure Data Explorer.
- Для дополнительных лабораторий Kafka