Verwenden des Hive-Katalogs mit Apache Flink® in HDInsight on AKS
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
In diesem Beispiel wird der Metastore von Hive als persistenter Katalog mit dem Hive-Katalog von Apache Flink verwendet. Diese Funktion dient hier dazu, Kafka- und MySQL-Tabellenmetadaten sitzungsübergreifend in Flink zu speichern. Flink verwendet eine Kafka-Tabelle, die im Hive-Katalog als Quelle registriert ist, führt einige Lookupvorgänge aus und platziert das Ergebnis in einer MySQL-Datenbank, die hier als Senke fungiert.
Voraussetzungen
- Apache Flink-Cluster in HDInsight on AKS mit Hive-Metastore 3.1.2
- Apache Kafka-Cluster in HDInsight
- Sie müssen sicherstellen, dass die Netzwerkeinstellungen wie unter Verwendung von Kafka beschrieben vorgenommen wurden. Damit wird sichergestellt, dass sich HDInsight on AKS- und HDInsight-Cluster im gleichen VNet befinden.
- MySQL 8.0.33
Apache Hive in Apache Flink
Flink bietet eine zweifache Hive-Integration.
- Der erste Schritt besteht darin, Hive Metastore (HMS) als persistenten Katalog mit HiveCatalog von Flink zu verwenden, um Flink-spezifische Metadaten sitzungsübergreifend zu speichern.
- Beispielsweise können Benutzer*innen ihre Kafka- oder ElasticSearch-Tabellen mithilfe von HiveCatalog in Hive Metastore speichern und sie später in SQL-Abfragen wiederverwenden.
- Der zweite Schritt besteht darin, Flink als alternative Engine zum Lesen und Schreiben von Hive-Tabellen bereitzustellen.
- Der Hive-Katalog ist standardmäßig mit bereits vorhandenen Hive-Installationen kompatibel. Sie müssen weder Ihre vorhandene Hive Metastore-Instanz noch die Datenplatzierung oder die Partitionierung Ihrer Tabellen ändern.
Weitere Informationen finden Sie unter Apache Hive.
Umgebungsvorbereitung
Erstellen eines Apache Flink-Clusters mit HMS
Hier wird ein Apache Flink-Cluster mit HMS im Azure-Portal erstellt. Eine ausführliche Anleitung zum Erstellen eines Flink-Clusters finden Sie hier.
Überprüfen Sie nach der Clustererstellung auf der AKS-Seite, ob HMS ausgeführt wird.
Vorbereiten des Kafka-Themas für Transaktionsdaten von Benutzerbestellungen in HDInsight
Laden Sie die JAR-Datei des Kafka-Clients mithilfe des folgenden Befehls herunter:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz
Entpacken Sie die TAR-Datei wie folgt:
tar -xvf kafka_2.12-3.2.0.tgz
Generieren Sie die Meldungen für das Kafka-Thema.
Weitere Befehle:
Hinweis
Ersetzen Sie „bootstrap-server“ durch den Hostnamen oder die IP-Adresse Ihrer eigenen Kafka-Broker.
--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092
--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders --bootstrap-server wn0-contsk:9092
--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning
Vorbereiten von Masterdaten für Benutzerbestellungen in MySQL in Azure
Testen der Datenbank:
Bereiten Sie die Bestelltabelle vor:
mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
mysql> CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_id INTEGER NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
(default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
(default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
(default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
(default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
(default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);
mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_id | customer_name | price | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-16 10:08:22 | 1 | Jark | 50.00000 | 102 | 0 |
| 10002 | 2023-07-16 10:11:09 | 2 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-16 10:11:09 | 3 | Sally | 25.00000 | 105 | 0 |
| 10004 | 2023-07-16 10:11:09 | 4 | Sally | 45.00000 | 105 | 0 |
| 10005 | 2023-07-16 10:11:09 | 5 | Sally | 35.00000 | 105 | 0 |
| 10006 | 2023-07-16 12:00:30 | 6 | Edward | 90.00000 | 106 | 0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)
mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------------+------+-----+---------+----------------+
| order_id | int | NO | PRI | NULL | auto_increment |
| order_date | datetime | NO | | NULL | |
| customer_id | int | NO | | NULL | |
| customer_name | varchar(255) | NO | | NULL | |
| price | decimal(10,5) | NO | | NULL | |
| product_id | int | NO | | NULL | |
| order_status | tinyint(1) | NO | | NULL | |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)
Verwenden der für SSH-Downloads erforderlichen JAR-Dateien für Kafka-Connector und MySQL-Datenbank
Hinweis
Laden Sie die richtige JAR-Version für die HDInsight-Kafka-Version und die MySQL-Version herunter.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Verschieben der JAR-Datei für den Planer
Verschieben Sie die JAR-Datei „flink-table-planner_2.12-1.17.0-....jar“ im Verzeichnis „/opt“ des WebSSH-Pods nach „/lib“, und verschieben Sie die JAR-Datei „flink-table-planner-loader1.17.0-....jar“ von „/lib“ nach „/opt/flink-webssh/opt/“. Ausführlichere Informationen finden Sie hier. Gehen Sie wie folgt vor, um die JAR-Datei für den Planer zu verschieben:
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Hinweis
Die Verschiebung einer weiteren JAR-Datei für den Planer ist nur erforderlich, wenn Sie einen Hive-Dialekt oder einen HiveServer2-Endpunkt verwenden. Dies ist jedoch das empfohlene Setup für die Hive-Integration.
Überprüfen
Herstellen einer Verbindung mit Flink SQL mithilfe von „bin/sql-client.sh“
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
Erstellen des Hive-Katalogs und Herstellen einer Verbindung mit dem Hive-Katalog in Flink SQL
Hinweis
Da wir bereits einen Flink-Cluster mit Hive Metastore verwenden, sind keine weiteren Konfigurationsschritte erforderlich.
CREATE CATALOG myhive WITH (
'type' = 'hive'
);
USE CATALOG myhive;
Erstellen der Kafka-Tabelle in Apache Flink SQL
CREATE TABLE kafka_user_orders (
`user_id` BIGINT,
`user_name` STRING,
`user_email` STRING,
`order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
`price` DECIMAL(10,5),
`product_id` BIGINT,
`order_status` BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'user_orders',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
select * from kafka_user_orders;
Erstellen der MySQL-Tabelle in Apache Flink SQL
CREATE TABLE mysql_user_orders (
`order_id` INT,
`order_date` TIMESTAMP,
`customer_id` INT,
`customer_name` STRING,
`price` DECIMAL(10,5),
`product_id` INT,
`order_status` BOOLEAN
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
'table-name' = 'orders',
'username' = '<username>',
'password' = '<password>'
);
select * from mysql_user_orders;
Überprüfen der im obigen Hive-Katalog registrierten Tabellen in Flink SQL
Verwenden der Masterbestellungstabelle als Senke für Informationen zu Benutzerbestelltransaktionen in MySQL in Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders;
Überprüfen, ob die Daten zu Benutzerbestelltransaktionen in Kafka der Masterbestellungstabelle in MySQL in Azure Cloud Shell hinzugefügt wurden
Erstellen drei weiterer Benutzerbestellungen in Kafka
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Überprüfen der Kafka-Tabellendaten in Flink SQL
Flink SQL> select * from kafka_user_orders;
Einfügen von product_id=104
in die Bestellungstabelle in MySQL in Flink SQL
INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;
Überprüfen, ob der Datensatz product_id = 104
in der Bestellungstabelle in MySQL in Azure Cloud Shell hinzugefügt wurde
Verweis
- Apache Hive
- Apache, Apache Hive, Hive, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Marken der Apache Software Foundation (ASF).