Ingerir dados do Apache Kafka no Azure Data Explorer

O Apache Kafka é uma plataforma de streaming distribuída para criar pipelines de dados de streaming em tempo real que movem dados de forma confiável entre sistemas ou aplicativos. O Kafka Connect é uma ferramenta para streaming de dados escalonável e confiável entre o Apache Kafka e outros sistemas. O Coletor Kafka do Kusto serve como o conector do Kafka e não requer o uso de código. Baixe o jar do conector do coletor no repositório Git ou no Hub do Conector do Confluent.

Este artigo mostra como ingerir dados com o Kafka, usando uma configuração autossuficiente do Docker para simplificar o cluster Kafka e a configuração do cluster do conector kafka.

Para obter mais informações, consulte o Repositório git do conector e as especificações de versão.

Pré-requisitos

Criar uma entidade de serviço Microsoft Entra

A entidade de serviço Microsoft Entra pode ser criada por meio do portal do Azure ou programaticamente, como no exemplo a seguir.

Essa entidade de serviço será a identidade usada pelo conector para gravar dados da tabela no Kusto. Posteriormente, você concederá permissões para que essa entidade de serviço acesse os recursos do Kusto.

  1. Entre em sua assinatura do Azure por meio da CLI do Azure. Em seguida, autentique no navegador.

    az login
    
  2. Escolha a assinatura para hospedar a entidade de segurança. Essa etapa é necessária quando você tem várias assinaturas.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Crie a entidade de serviço. Neste exemplo, a entidade de serviço é chamada my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dos dados JSON retornados, copie o appId, passworde tenant para uso futuro.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Você criou o aplicativo do Microsoft Entra e a entidade de serviço.

Criar uma tabela de destino

  1. No ambiente de consulta, crie uma tabela chamada Storms usando o seguinte comando:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Crie o mapeamento de tabela correspondente Storms_CSV_Mapping para dados ingeridos usando o seguinte comando:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Crie uma política de envio em lote de ingestão na tabela para latência de ingestão enfileirada configurável.

    Dica

    A política de envio em lote de ingestão é um otimizador de desempenho e inclui três parâmetros. A primeira condição satisfeita dispara a ingestão na tabela do Azure Data Explorer.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Use a entidade de serviço de Criar uma entidade de serviço Microsoft Entra para conceder permissão para trabalhar com o banco de dados.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Executar o laboratório

O laboratório a seguir foi projetado para oferecer a você a experiência de começar a criar dados, configurar o conector do Kafka e transmitir esses dados para o Azure Data Explorer com o conector. Em seguida, você pode examinar os dados ingeridos.

Definir o repositório Git

Clone o repositório git do laboratório.

  1. Crie um diretório local em seu computador.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Clonar o repositório.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Conteúdo do repositório clonado

Execute o seguinte comando para listar o conteúdo do repositório clonado:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Esse resultado dessa pesquisa é:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Examinar os arquivos no repositório clonado

As seções a seguir explicam as partes importantes dos arquivos na árvore de arquivos acima.

adx-sink-config.json

Esse arquivo contém o arquivo de propriedades do coletor Kusto, no qual você atualizará detalhes de configuração específicos:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Substitua os valores dos seguintes atributos de acordo com a configuração do Azure Data Explorer: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (o nome do banco de dados), kusto.ingestion.url e kusto.query.url.

Conector – Dockerfile

Esse arquivo tem os comandos para gerar a imagem do Docker para a instância do conector. Ele inclui o download do conector do diretório de liberação do repositório git.

Diretório Storm-events-producer

Esse diretório tem um programa Go que lê um arquivo local "StormEvents.csv" e publica os dados em um tópico Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Iniciar os contêineres

  1. Em um terminal, inicie os contêineres:

    docker-compose up
    

    O aplicativo produtor começará a enviar eventos para o tópico storm-events. Você deverá ver registros semelhantes aos seguintes:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Para verificar os registros, execute o seguinte comando em um terminal separado:

    docker-compose logs -f | grep kusto-connect
    

Iniciar o conector

Use uma chamada Kafka Connect REST para iniciar o conector.

  1. Em um terminal separado, inicie a tarefa do coletor com o seguinte comando:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Para verificar o status, execute o seguinte comando em um terminal separado:

    curl http://localhost:8083/connectors/storm/status
    

O conector começará a colocar os processos de ingestão na fila para o Azure Data Explorer.

Observação

Se você tiver problemas com o conector de registros, crie um problema.

Consultar e examinar dados

Confirmar ingestão de dados

  1. Aguarde até que os dados cheguem à tabela Storms. Para confirmar a transferência de dados, verifique a contagem de linhas:

    Storms | count
    
  2. Confirme se não há falhas no processo de ingestão:

    .show ingestion failures
    

    Depois de ver os dados, experimente algumas consultas.

Consultar os dados

  1. Para ver todos os registros, execute a seguinte consulta:

    Storms
    
  2. Use where e project para filtrar dados específicos:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Use o operador summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Captura de tela dos resultados do gráfico de colunas de consulta kafka no Data Explorer do Azure.

Para obter mais exemplos de consulta e diretrizes, consulte Escrever consultas no KQL e Linguagem de Consulta Kusto documentação.

Redefinir

Para reiniciar, execute as etapas a seguir:

  1. Parar os contêineres (docker-compose down -v)
  2. Excluir (drop table Storms)
  3. Recriar a tabela Storms
  4. Recriar mapeamento de tabela
  5. Reiniciar os contêineres (docker-compose up)

Limpar os recursos

Para excluir os recursos do Azure Data Explorer, use az cluster delete ou az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Ajustando o conector do coletor Kafka

Ajuste o conector do coletor Kafka para trabalhar com a política de envio em lote de ingestão:

  • Ajuste o limite de tamanho de flush.size.bytes do coletor Kafka começando em 1 MB, aumentando em incrementos de 10 MB ou 100 MB.
  • Ao usar o coletor Kafka, os dados são agregados duas vezes. Os dados do lado do conector são agregados de acordo com as configurações de liberação e no lado do serviço de Azure Data Explorer, de acordo com a política de envio em lote. Se o tempo de envio em lote for muito curto e nenhum dado puder ser ingerido pelo conector e pelo serviço, o tempo de envio em lote deverá ser aumentado. Defina o tamanho do lote em 1 GB e aumente ou diminua em incrementos de 100 MB, conforme necessário. Por exemplo, se o tamanho de liberação for de 1 MB e o tamanho da política de envio em lote for de 100 MB, depois que um lote de 100 MB for agregado pelo conector do Coletor kafka, um lote de 100 MB será ingerido pelo serviço Data Explorer do Azure. Se o tempo de política de envio em lote for de 20 segundos e o conector do Coletor kafka liberar 50 MB em um período de 20 segundos, o serviço ingerirá um lote de 50 MB.
  • Você pode dimensionar adicionando instâncias e partições Kafka. Aumente tasks.max para o número de partições. Crie uma partição se você tiver dados suficientes para produzir um blob do tamanho da configuração flush.size.bytes. Se o blob for menor, o lote será processado quando atingir o limite de tempo, portanto, a partição não receberá taxa de transferência suficiente. Um grande número de partições significa mais sobrecarga de processamento.