Integration von Azure Data Explorer und Apache Flink®
Azure Data Explorer ist eine vollständig verwaltete, leistungsstarke Big Data-Analyseplattform, die das Analysieren großer Datenmengen nahezu in Echtzeit vereinfacht.
ADX hilft Benutzer*innen bei der Analyse großer Datenmengen von Streaminganwendungen, Websites, IoT-Geräten usw. Durch die Integration von Apache Flink mit ADX können Sie Echtzeitdaten verarbeiten und in ADX analysieren.
Voraussetzungen
Schritte zum Verwenden von Azure Data Explorer als Senke in Flink
Erstellen Sie ADX mit Datenbank und Tabelle nach Bedarf.
Fügen Sie Erfasserberechtigungen für die verwaltete Identität in Kusto hinzu.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Führen Sie ein Beispielprogramm aus, und definieren Sie dabei den Kusto-Cluster-URI (Uniform Resource Identifier), die verwendete Datenbank und die verwendete verwaltete Identität sowie die Tabelle, in die geschrieben werden muss.
Klonen Sie das Projekt „flink-connector-kusto“: https://github.com/Azure/flink-connector-kusto.git
Erstellen Sie die Tabelle in ADX mithilfe des folgenden Befehls:
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Aktualisieren Sie die Datei „FlinkKustoSinkSample.java“ mit dem richtigen Kusto-Cluster-URI, der Datenbank und der verwendeten verwalteten Identität.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Erstellen Sie das Projekt später mithilfe von „mvn clean package“.
Suchen Sie im Ordner „sample-java/target“ nach der JAR-Datei „samples-java-1.0-SNAPSHOT-shaded.jar“, laden Sie sie in die Flink-Benutzeroberfläche hoch, und übermitteln Sie den Auftrag.
Fragen Sie die Kusto-Tabelle ab, um die Ausgabe zu überprüfen.
Die Daten werden aus Flink ohne Verzögerung in die Kusto-Tabelle geschrieben.
Verweis
- Apache Flink-Website
- Apache, Apache Flink, Flink und zugehörige Open-Source-Projektnamen sind Marken der Apache Software Foundation (ASF).