Erfassen von Daten mit Apache Flink in Azure Data Explorer

Apache Flink ist ein Framework und verteiltes Verarbeitungsmodul für zustandsbehaftete Berechnungen über ungebundene und gebundene Datenströme.

Der Flink-Connector ist ein Open-Source-Projekt, das auf jedem Flink-Cluster ausgeführt werden kann. Es implementiert eine Datensenke zum Verschieben von Daten aus einem Flink-Cluster. Wenn Sie den Konnektor zu Apache Flink verwenden, können Sie schnelle und skalierbare Anwendungen für datengesteuerte Szenarien erstellen. Beispiele dafür sind maschinelles Lernen (Machine Learning, ML), Extrahieren, Transformieren und Laden (Extract-Transform-Load, ETL) und Log Analytics.

In diesem Artikel erfahren Sie, wie Sie den Flink-Connector verwenden, um Daten von Flink an Ihre Tabelle zu senden. Sie erstellen eine Tabelle und eine Datenzuordnung und weisen Flink an, Daten an die Tabelle zu senden und die Ergebnisse zu überprüfen.

Voraussetzungen

Integrieren Sie für Flink-Projekte, die Maven zum Verwalten von Abhängigkeiten verwenden, Flink Connector Core Sink For Azure Data Explorer, indem Sie es als Abhängigkeit hinzufügen:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Für Projekte, die Maven nicht zum Verwalten von Abhängigkeiten verwenden, klonen Sie das Repository für Azure Data Explorer Connector For Apache Flink und erstellen Sie es lokal. So können Sie den Konnektor mithilfe des Befehls mvn clean install -DskipTests manuell zu Ihrem lokalen Maven-Repository hinzufügen.

Sie können sich von Flink aus entweder über eine Microsoft Entra-ID-Anwendung oder eine verwaltete Identität authentifizieren.

Dieser Dienstprinzipal ist die Identität, die vom Connector zum Schreiben von Daten in die Ihre Tabelle in Kusto genutzt wird. Sie gewähren diesem Dienstprinzipal später noch Berechtigungen für den Zugriff auf Kusto-Ressourcen.

  1. Melden Sie sich per Azure CLI an Ihrem Azure-Abonnement an. Führen Sie anschließend im Browser die Authentifizierung durch.

    az login
    
  2. Wählen Sie das Abonnement aus, um den Prinzipal zu hosten. Dieser Schritt ist erforderlich, wenn Sie über mehrere Abonnements verfügen.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Erstellen Sie den Dienstprinzipal. In diesem Beispiel wird der Dienstprinzipal als my-service-principal bezeichnet.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopieren Sie aus den zurückgegebenen JSON-Daten appId, password und tenant für die zukünftige Verwendung.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Sie haben Ihre Microsoft Entra-Anwendung und den Dienstprinzipal erstellt.

  1. Erteilen Sie Anwendungsbenutzer-Berechtigungen für die Datenbank:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Erteilen Sie der Anwendung entweder Ingestor- oder Administratorberechtigungen für die Tabelle. Die erforderlichen Berechtigungen hängen von der ausgewählten Datenschreibmethode ab. Ingestor-Berechtigungen sind für SinkV2 ausreichend, während WriteAndSink Administratorberechtigungen benötigt.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Weitere Informationen zur Autorisierung finden Sie unter Kusto rollenbasierte Zugriffssteuerung.

So schreiben Sie Daten aus Flink:

  1. Importieren Sie die erforderlichen Optionen:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Verwenden Sie Ihre Anwendung oder verwaltete Identität, um sich zu authentifizieren.

    Für die Anwendungsauthentifizierung:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Für die Authentifizierung mit verwalteten Identitäten:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurieren Sie die Senkenparameter wie Datenbank und Tabelle:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Sie können weitere Optionen hinzufügen, wie in der folgenden Tabelle beschrieben:

    Option Beschreibung Standardwert
    IngestionMappingRef Verweist auf eine vorhandene Ingestion-Zuordnung.
    FlushImmediately Leert Daten sofort und kann Leistungsprobleme verursachen. Diese Methode wird nicht empfohlen.
    BatchIntervalMs Steuert, wie oft Daten geleert werden. 30 Sekunden
    BatchSize Legt die Batchgröße für das Puffern von Datensätzen vor dem Leeren fest. 1.000 Datensätze
    ClientBatchSizeLimit Gibt die Größe von aggregierten Daten in MB vor der Erfassung an. 300MB
    PollForIngestionStatus Wenn true, fragt der Konnektor den Erfassungsstatus nach dem Leeren von Daten ab. false
    DeliveryGuarantee Bestimmt die Zustellungs-Garantie-Semantik. Verwenden Sie WriteAheadSink, um genau einmal Semantik zu erreichen. AT_LEAST_ONCE
  2. Schreiben Sie Streamingdaten mit einer der folgenden Methoden:

    • SinkV2: Dies ist eine zustandslose Option, mit der Daten am Prüfpunkt geleert werden, um mindestens einmal Konsistenz sicherzustellen. Wir empfehlen diese Option für Datenerfassung mit hohem Volumen.
    • WriteAheadSink: Mit dieser Methode werden Daten an einen KustoSink ausgegeben. Es ist in das Prüfpunktsystem von Flink integriert und bietet genau einmal Garantien. Daten werden in einem AbstractStateBackend gespeichert und erst nach Abschluss eines Prüfpunkts zugesichert.

    Im folgenden Beispiel wird SinkV2 verwendet. Nutzen Sie Methode buildWriteAheadSink statt build, um WriteAheadSink zu verwenden:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Der vollständige Code sollte in etwa wie folgt aussehen:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Überprüfen, dass Daten erfasst werden

Nachdem die Verbindung konfiguriert wurde, werden Die Daten an Ihre Tabelle gesendet. Sie können überprüfen, ob die Daten aufgenommen werden, indem Sie eine KQL-Abfrage ausführen.

  1. Führen Sie die folgende Abfrage aus, um zu überprüfen, ob Daten in die Tabelle aufgenommen werden:

    <MyTable>
    | count
    
  2. Führen Sie die folgende Abfrage aus, um die Daten anzuzeigen:

    <MyTable>
    | take 100