Como usar o Catálogo do Hive com o Apache Flink® no Azure HDInsight no AKS

Observação

Desativaremos o Microsoft Azure HDInsight no AKS em 31 de janeiro de 2025. Para evitar o encerramento abrupto das suas cargas de trabalho, você precisará migrá-las para o Microsoft Fabric ou para um produto equivalente do Azure antes de 31 de janeiro de 2025. Os clusters restantes em sua assinatura serão interrompidos e removidos do host.

Somente um suporte básico estará disponível até a data da desativação.

Importante

Esse recurso está atualmente na visualização. Os Termos de uso complementares para versões prévias do Microsoft Azure incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, confira Informações sobre a versão prévia do Azure HDInsight no AKS. Caso tenha perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para ver mais atualizações sobre a Comunidade do Azure HDInsight.

Este exemplo usa o Metastore do Hive como um catálogo persistente com o Catálogo d Hive do Apache Flink. Usamos essa funcionalidade para armazenar metadados de tabelas do Kafka e do MySQL no Flink entre sessões. O Flink usa a tabela Kafka registrada no Hive Catalog como fonte, realiza alguma pesquisa e coleta de resultados para o banco de dados MySQL

Pré-requisitos

O Flink oferece uma integração dupla com o Hive.

  • A primeira etapa é usar o Metastore do Hive (HMS) como um catálogo persistente com o HiveCatalog da Flink para armazenar metadados específicos do Flink entre sessões.
    • Por exemplo, os usuários podem armazenar suas tabelas Kafka ou ElasticSearch no Metastore do Hive usando o HiveCatalog e reutilizá-las posteriormente em consultas SQL.
  • A segunda é oferecer o Flink como um mecanismo alternativo para ler e escrever tabelas do Hive.
  • O HiveCatalog foi projetado para ser "pronto para uso" compatível com instalações existentes do Hive. Você não precisa modificar seu Metastore do Hive existente ou alterar o posicionamento de dados ou o particionamento de suas tabelas.

Para obter mais informações, consulte Apache Hive

Preparação do ambiente

Permite criar um cluster Apache Flink com o HMS no portal do Azure, você pode consultar as instruções detalhadas sobre criação de cluster Flink.

Captura de tela mostrando como criar um cluster Flink.

Após a criação do cluster, verifique se o HMS está em execução ou não no lado do AKS.

Captura de tela mostrando como verificar o status do HMS no cluster Flink.

Preparar dados de transação de pedido do usuário tópico Kafka no HDInsight

Baixe o jar do cliente kafka usando o seguinte comando:

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz

Descompacte o arquivo tar com

tar -xvf kafka_2.12-3.2.0.tgz

Produza as mensagens para o tópico Kafka.

Captura de tela mostrando como produzir mensagens para o tópico kafka.

Outros comandos:

Observação

Você é obrigado a substituir o servidor de inicialização por seu próprio nome de host ou IP dos corretores kafka

--- delete topic
./kafka-topics.sh --delete --topic user_orders --bootstrap-server wn0-contsk:9092

--- create topic
./kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user_orders  --bootstrap-server wn0-contsk:9092

--- produce topic
./kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders

--- consumer topic
./kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders --from-beginning

Preparar dados mestre de pedidos do usuário no MySQL no Azure

Banco de dados de teste:

Captura de tela mostrando como testar o banco de dados no Kafka.

Captura de tela mostrando como executar o Cloud Shell no portal.

Prepare a tabela de pedidos:

mysql> use mydb
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

mysql> CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_id INTEGER NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;


mysql> INSERT INTO orders
VALUES (default, '2023-07-16 10:08:22','0001', 'Jark', 50.00, 102, false),
       (default, '2023-07-16 10:11:09','0002', 'Sally', 15.00, 105, false),
       (default, '2023-07-16 10:11:09','000', 'Sally', 25.00, 105, false),
       (default, '2023-07-16 10:11:09','0004', 'Sally', 45.00, 105, false),
       (default, '2023-07-16 10:11:09','0005', 'Sally', 35.00, 105, false),
       (default, '2023-07-16 12:00:30','0006', 'Edward', 90.00, 106, false);

mysql> select * from orders;
+----------+---------------------+-------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_id | customer_name | price    | product_id | order_status |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-16 10:08:22 |           1 | Jark          | 50.00000 |        102 |            0 |
|    10002 | 2023-07-16 10:11:09 |           2 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-16 10:11:09 |           3 | Sally         | 25.00000 |        105 |            0 |
|    10004 | 2023-07-16 10:11:09 |           4 | Sally         | 45.00000 |        105 |            0 |
|    10005 | 2023-07-16 10:11:09 |           5 | Sally         | 35.00000 |        105 |            0 |
|    10006 | 2023-07-16 12:00:30 |           6 | Edward        | 90.00000 |        106 |            0 |
+----------+---------------------+-------------+---------------+----------+------------+--------------+
6 rows in set (0.22 sec)

mysql> desc orders;
+---------------+---------------+------+-----+---------+----------------+
| Field         | Type          | Null | Key | Default | Extra          |
+---------------+---------------+------+-----+---------+----------------+
| order_id      | int           | NO   | PRI | NULL    | auto_increment |
| order_date    | datetime      | NO   |     | NULL    |                |
| customer_id   | int           | NO   |     | NULL    |                |
| customer_name | varchar(255)  | NO   |     | NULL    |                |
| price         | decimal(10,5) | NO   |     | NULL    |                |
| product_id    | int           | NO   |     | NULL    |                |
| order_status  | tinyint(1)    | NO   |     | NULL    |                |
+---------------+---------------+------+-----+---------+----------------+
7 rows in set (0.22 sec)

Usando SSH download necessário conector Kafka e jars de banco de dados MySQL

Observação

Baixe o jar da versão correta de acordo com nossa versão do HDInsight kafka e a versão do MySQL.

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Movendo o frasco do planejador

Mova o jar flink-table-planner_2.12-1.17.0-....jar localizado no pod webssh /opt to /lib e move out the jar flink-table-planner-loader1.17.0-...jar /opt/flink-webssh/opt/ de /lib. Consulte problema para obter mais detalhes. Execute as etapas a seguir para mover o jar do planejador.

mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/

Observação

Uma movimentação de jar de planejador extra só é necessária ao usar o dialeto do Hive ou o ponto de extremidade HiveServer2. No entanto, essa é a configuração recomendada para a integração do Hive.

Validação

bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar

Observação

Como já usamos o cluster Flink com o Metastore do Hive, não há necessidade de executar nenhuma configuração adicional.

CREATE CATALOG myhive WITH (
    'type' = 'hive'
);

USE CATALOG myhive;
CREATE TABLE kafka_user_orders (
  `user_id` BIGINT,
  `user_name` STRING,
  `user_email` STRING,
  `order_date` TIMESTAMP(3) METADATA FROM 'timestamp',
  `price` DECIMAL(10,5),
  `product_id` BIGINT,
  `order_status` BOOLEAN
) WITH (
    'connector' = 'kafka',  
    'topic' = 'user_orders',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

select * from kafka_user_orders;

Captura de tela mostrando como criar a tabela Kafka.

CREATE TABLE mysql_user_orders (
  `order_id` INT,
  `order_date` TIMESTAMP,
  `customer_id` INT,
  `customer_name` STRING,
  `price` DECIMAL(10,5),
  `product_id` INT,
  `order_status` BOOLEAN
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://<servername>.mysql.database.azure.com/mydb',
  'table-name' = 'orders',
  'username' = '<username>',
  'password' = '<password>'
);

select * from mysql_user_orders;

Captura de tela mostrando como criar uma tabela mysql.

Captura de tela mostrando a saída da tabela.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
 SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
 FROM kafka_user_orders;

Captura de tela mostrando como afundar a transação do usuário.

Captura de tela mostrando a interface do usuário do Flink.

Verificar se os dados de ordem de transação do usuário no Kafka são adicionados na ordem da tabela mestra no MySQL no Azure Cloud Shell

Captura de tela mostrando como verificar a transação do usuário.

Criando mais três ordens de usuário no Kafka

sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic user_orders
>{"user_id": null,"user_name": "Lucy","user_email": "user8@example.com","order_date": "07/17/2023 21:33:44","price": "90.00000","product_id": "102","order_status": false}
>{"user_id": "0009","user_name": "Zark","user_email": "user9@example.com","order_date": "07/17/2023 21:52:07","price": "80.00000","product_id": "103","order_status": true}
>{"user_id": "0010","user_name": "Alex","user_email": "user10@example.com","order_date": "07/17/2023 21:52:07","price": "70.00000","product_id": "104","order_status": true}
Flink SQL> select * from kafka_user_orders;

Captura de tela mostrando como verificar os dados da tabela kafka.

INSERT INTO mysql_user_orders (order_date, customer_id, customer_name, price, product_id, order_status)
SELECT order_date, CAST(user_id AS INT), user_name, price, CAST(product_id AS INT), order_status
FROM kafka_user_orders where product_id = 104;

Captura de tela mostrando como verificar a tabela de pedidos.

Verificar product_id = 104 registro é adicionado na tabela de ordem no MySQL no Azure Cloud Shell

Captura de tela mostrando os registros adicionados à tabela de pedidos.

Referência