Ingerir dados do Apache Kafka no Azure Data Explorer
O Apache Kafka é uma plataforma de streaming distribuída para criar pipelines de dados de streaming em tempo real que movem dados de forma confiável entre sistemas ou aplicativos. O Kafka Connect é uma ferramenta para streaming de dados escalonável e confiável entre o Apache Kafka e outros sistemas. O Coletor Kafka do Kusto serve como o conector do Kafka e não requer o uso de código. Baixe o jar do conector do coletor no repositório Git ou no Hub do Conector do Confluent.
Este artigo mostra como ingerir dados com o Kafka, usando uma configuração autossuficiente do Docker para simplificar o cluster Kafka e a configuração do cluster do conector kafka.
Para obter mais informações, consulte o Repositório git do conector e as especificações de versão.
Pré-requisitos
- Uma assinatura do Azure. Criar uma conta gratuita do Azure.
- Um cluster e um banco de dados Data Explorer do Azure com as políticas de cache e retenção padrão ou um banco de dados KQL no Microsoft Fabric.
- CLI do Azure.
- Docker e Docker Compose.
Criar uma entidade de serviço Microsoft Entra
A entidade de serviço Microsoft Entra pode ser criada por meio do portal do Azure ou programaticamente, como no exemplo a seguir.
Essa entidade de serviço será a identidade usada pelo conector para gravar dados da tabela no Kusto. Posteriormente, você concederá permissões para que essa entidade de serviço acesse os recursos do Kusto.
Entre em sua assinatura do Azure por meio da CLI do Azure. Em seguida, autentique no navegador.
az login
Escolha a assinatura para hospedar a entidade de segurança. Essa etapa é necessária quando você tem várias assinaturas.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Crie a entidade de serviço. Neste exemplo, a entidade de serviço é chamada
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Dos dados JSON retornados, copie o
appId
,password
etenant
para uso futuro.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Você criou o aplicativo do Microsoft Entra e a entidade de serviço.
Criar uma tabela de destino
No ambiente de consulta, crie uma tabela chamada
Storms
usando o seguinte comando:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Crie o mapeamento de tabela correspondente
Storms_CSV_Mapping
para dados ingeridos usando o seguinte comando:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Crie uma política de envio em lote de ingestão na tabela para latência de ingestão enfileirada configurável.
Dica
A política de envio em lote de ingestão é um otimizador de desempenho e inclui três parâmetros. A primeira condição satisfeita dispara a ingestão na tabela do Azure Data Explorer.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Use a entidade de serviço de Criar uma entidade de serviço Microsoft Entra para conceder permissão para trabalhar com o banco de dados.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Executar o laboratório
O laboratório a seguir foi projetado para oferecer a você a experiência de começar a criar dados, configurar o conector do Kafka e transmitir esses dados para o Azure Data Explorer com o conector. Em seguida, você pode examinar os dados ingeridos.
Definir o repositório Git
Clone o repositório git do laboratório.
Crie um diretório local em seu computador.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Clonar o repositório.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Conteúdo do repositório clonado
Execute o seguinte comando para listar o conteúdo do repositório clonado:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Esse resultado dessa pesquisa é:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Examinar os arquivos no repositório clonado
As seções a seguir explicam as partes importantes dos arquivos na árvore de arquivos acima.
adx-sink-config.json
Esse arquivo contém o arquivo de propriedades do coletor Kusto, no qual você atualizará detalhes de configuração específicos:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Substitua os valores dos seguintes atributos de acordo com a configuração do Azure Data Explorer: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(o nome do banco de dados), kusto.ingestion.url
e kusto.query.url
.
Conector – Dockerfile
Esse arquivo tem os comandos para gerar a imagem do Docker para a instância do conector. Ele inclui o download do conector do diretório de liberação do repositório git.
Diretório Storm-events-producer
Esse diretório tem um programa Go que lê um arquivo local "StormEvents.csv" e publica os dados em um tópico Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Iniciar os contêineres
Em um terminal, inicie os contêineres:
docker-compose up
O aplicativo produtor começará a enviar eventos para o tópico
storm-events
. Você deverá ver registros semelhantes aos seguintes:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Para verificar os registros, execute o seguinte comando em um terminal separado:
docker-compose logs -f | grep kusto-connect
Iniciar o conector
Use uma chamada Kafka Connect REST para iniciar o conector.
Em um terminal separado, inicie a tarefa do coletor com o seguinte comando:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Para verificar o status, execute o seguinte comando em um terminal separado:
curl http://localhost:8083/connectors/storm/status
O conector começará a colocar os processos de ingestão na fila para o Azure Data Explorer.
Observação
Se você tiver problemas com o conector de registros, crie um problema.
Consultar e examinar dados
Confirmar ingestão de dados
Aguarde até que os dados cheguem à tabela
Storms
. Para confirmar a transferência de dados, verifique a contagem de linhas:Storms | count
Confirme se não há falhas no processo de ingestão:
.show ingestion failures
Depois de ver os dados, experimente algumas consultas.
Consultar os dados
Para ver todos os registros, execute a seguinte consulta:
Storms
Use
where
eproject
para filtrar dados específicos:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Use o operador
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Para obter mais exemplos de consulta e diretrizes, consulte Escrever consultas no KQL e Linguagem de Consulta Kusto documentação.
Redefinir
Para reiniciar, execute as etapas a seguir:
- Parar os contêineres (
docker-compose down -v
) - Excluir (
drop table Storms
) - Recriar a tabela
Storms
- Recriar mapeamento de tabela
- Reiniciar os contêineres (
docker-compose up
)
Limpar os recursos
Para excluir os recursos do Azure Data Explorer, use az cluster delete ou az Kusto database delete:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Ajustando o conector do coletor Kafka
Ajuste o conector do coletor Kafka para trabalhar com a política de envio em lote de ingestão:
- Ajuste o limite de tamanho de
flush.size.bytes
do coletor Kafka começando em 1 MB, aumentando em incrementos de 10 MB ou 100 MB. - Ao usar o coletor Kafka, os dados são agregados duas vezes. Os dados do lado do conector são agregados de acordo com as configurações de liberação e no lado do serviço de Azure Data Explorer, de acordo com a política de envio em lote. Se o tempo de envio em lote for muito curto e nenhum dado puder ser ingerido pelo conector e pelo serviço, o tempo de envio em lote deverá ser aumentado. Defina o tamanho do lote em 1 GB e aumente ou diminua em incrementos de 100 MB, conforme necessário. Por exemplo, se o tamanho de liberação for de 1 MB e o tamanho da política de envio em lote for de 100 MB, depois que um lote de 100 MB for agregado pelo conector do Coletor kafka, um lote de 100 MB será ingerido pelo serviço Data Explorer do Azure. Se o tempo de política de envio em lote for de 20 segundos e o conector do Coletor kafka liberar 50 MB em um período de 20 segundos, o serviço ingerirá um lote de 50 MB.
- Você pode dimensionar adicionando instâncias e partições Kafka. Aumente
tasks.max
para o número de partições. Crie uma partição se você tiver dados suficientes para produzir um blob do tamanho da configuraçãoflush.size.bytes
. Se o blob for menor, o lote será processado quando atingir o limite de tempo, portanto, a partição não receberá taxa de transferência suficiente. Um grande número de partições significa mais sobrecarga de processamento.
Conteúdo relacionado
- Saiba mais sobre Arquiteturas de big data.
- Saiba como ingerir dados de amostra formatados em JSON no Azure Data Explorer.
- Para Kafka Labs adicionais: