Inserire dati da Apache Kafka in Azure Esplora dati

Apache Kafka è una piattaforma di streaming distribuita per la creazione di pipeline di dati di streaming in tempo reale che spostano in modo affidabile i dati tra sistemi o applicazioni. Kafka Connect è uno strumento per lo streaming scalabile e affidabile dei dati tra Apache Kafka e altri sistemi di dati. Il sink Kusto Kafka funge da connettore da Kafka e non richiede l'uso del codice. Scaricare il file jar del connettore sink dal repository Git o dall'hub del connettore Confluent.

Questo articolo illustra come inserire dati con Kafka usando una configurazione docker autonoma per semplificare la configurazione del cluster Kafka e del cluster Kafka.

Per altre informazioni, vedere il repository Git del connettore e le specifiche della versione.

Prerequisiti

Creare un'entità servizio Microsoft Entra

L'Microsoft Entra'entità servizio può essere creata tramite il portale di Azure o a livello di codice, come nell'esempio seguente.

Questa entità servizio sarà l'identità usata dal connettore per scrivere dati nella tabella in Kusto. In seguito si concedono le autorizzazioni per questa entità servizio per accedere alle risorse Kusto.

  1. Accedere alla sottoscrizione di Azure tramite l'interfaccia della riga di comando di Azure. Eseguire quindi l'autenticazione nel browser.

    az login
    
  2. Scegliere la sottoscrizione per ospitare l'entità. Questo passaggio è necessario quando si hanno più sottoscrizioni.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Creare l'entità servizio. In questo esempio l'entità servizio è denominata my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dai dati JSON restituiti copiare , appIdpassworde tenant per un uso futuro.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

È stata creata l'applicazione Microsoft Entra e l'entità servizio.

Creare una tabella di destinazione

  1. Dall'ambiente di query creare una tabella denominata Storms usando il comando seguente:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Creare il mapping Storms_CSV_Mapping della tabella corrispondente per i dati inseriti usando il comando seguente:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Creare un criterio di inserimento in batch nella tabella per la latenza di inserimento in coda configurabile.

    Suggerimento

    I criteri di inserimento in batch sono un'utilità di ottimizzazione delle prestazioni e includono tre parametri. La prima condizione soddisfatta attiva l'inserimento nella tabella Esplora dati di Azure.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Usare l'entità servizio da Creare un'entità servizio Microsoft Entra per concedere l'autorizzazione per lavorare con il database.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Eseguire il lab

Il lab seguente è progettato per offrire l'esperienza di iniziare a creare dati, configurare il connettore Kafka e trasmettere questi dati in Azure Esplora dati con il connettore. È quindi possibile esaminare i dati inseriti.

Clonare il repository Git

Clonare il repository Git del lab.

  1. Creare una directory locale nel computer.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Clonare il repository.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Contenuto del repository clonato

Eseguire il comando seguente per elencare il contenuto del repository clonato:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Questo risultato della ricerca è:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Esaminare i file nel repository clonato

Le sezioni seguenti illustrano le parti importanti dei file nell'albero dei file precedente.

adx-sink-config.json

Questo file contiene il file delle proprietà del sink Kusto in cui verranno aggiornati dettagli di configurazione specifici:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Sostituire i valori per gli attributi seguenti in base all'installazione di Azure Esplora dati: , , (il nome del database), kusto.ingestion.urle kusto.query.url. kusto.tables.topics.mappingaad.auth.appkeyaad.auth.appidaad.auth.authority

Connettore - Dockerfile

Questo file include i comandi per generare l'immagine Docker per l'istanza del connettore. Include il download del connettore dalla directory della versione del repository Git.

Directory Storm-events-producer

Questa directory include un programma Go che legge un file "StormEvents.csv" locale e pubblica i dati in un argomento Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Avviare i contenitori

  1. In un terminale avviare i contenitori:

    docker-compose up
    

    L'applicazione producer inizierà a inviare eventi all'argomento storm-events . Verranno visualizzati log simili ai log seguenti:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Per controllare i log, eseguire il comando seguente in un terminale separato:

    docker-compose logs -f | grep kusto-connect
    

Avviare il connettore

Usare una chiamata REST Kafka Connect per avviare il connettore.

  1. In un terminale separato avviare l'attività sink con il comando seguente:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Per controllare lo stato, eseguire il comando seguente in un terminale separato:

    curl http://localhost:8083/connectors/storm/status
    

Il connettore avvierà l'accodamento dei processi di inserimento in Azure Esplora dati.

Nota

In caso di problemi del connettore di log, creare un problema.

Eseguire query ed esaminare i dati

Confermare l'inserimento dati

  1. Attendere che i dati arrivino nella Storms tabella. Per confermare il trasferimento dei dati, controllare il numero di righe:

    Storms | count
    
  2. Verificare che non siano presenti errori nel processo di inserimento:

    .show ingestion failures
    

    Dopo aver visualizzato i dati, provare alcune query.

Eseguire una query sui dati

  1. Per visualizzare tutti i record, eseguire la query seguente:

    Storms
    
  2. Usare where e project per filtrare dati specifici:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Usare l'operatore summarize :

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Screenshot del grafico a colonne di query Kafka in Azure Esplora dati.

Per altri esempi di query e linee guida, vedere Scrivere query in KQL e Linguaggio di query Kusto documentazione.

Reset

Per reimpostare, seguire questa procedura:

  1. Arrestare i contenitori (docker-compose down -v)
  2. Elimina (drop table Storms)
  3. Ricreare la Storms tabella
  4. Ricreare il mapping delle tabelle
  5. Riavviare i contenitori (docker-compose up)

Pulire le risorse

Per eliminare le risorse di Azure Esplora dati, usare az cluster delete o az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Ottimizzazione del connettore Sink Kafka

Ottimizzare il connettore Sink Kafka per usare i criteri di invio in batch:

  • Ottimizzare il limite di dimensioni sink Kafka flush.size.bytes a partire da 1 MB, aumentando con incrementi di 10 MB o 100 MB.
  • Quando si usa il sink Kafka, i dati vengono aggregati due volte. I dati sul lato connettore vengono aggregati in base alle impostazioni di scaricamento e sul lato del servizio Esplora dati di Azure in base ai criteri di invio in batch. Se il tempo di invio in batch è troppo breve e non è possibile inserire dati sia dal connettore che dal servizio, è necessario aumentare il tempo di invio in batch. Impostare le dimensioni di invio in batch a 1 GB e aumentare o diminuire di 100 MB incrementi in base alle esigenze. Ad esempio, se le dimensioni dello scaricamento sono pari a 1 MB e le dimensioni dei criteri di invio in batch sono pari a 100 MB, dopo che un batch di 100 MB viene aggregato dal connettore Sink Kafka, un batch di 100 MB verrà inserito dal servizio Azure Esplora dati. Se il tempo dei criteri di invio in batch è di 20 secondi e il connettore Sink Kafka scarica 50 MB in un periodo di 20 secondi, il servizio inserisce un batch di 50 MB.
  • È possibile ridimensionare aggiungendo istanze e partizioni Kafka. Aumentare tasks.max al numero di partizioni. Creare una partizione se sono presenti dati sufficienti per produrre un BLOB le dimensioni dell'impostazione flush.size.bytes . Se il BLOB è più piccolo, il batch viene elaborato quando raggiunge il limite di tempo, quindi la partizione non riceverà una velocità effettiva sufficiente. Un numero elevato di partizioni significa un sovraccarico di elaborazione maggiore.