Incorporar o Flink DataStream à Tabela Delta Lake do Azure Databricks

Esse exemplo mostra como inserir dados de fluxo no Azure ADLS Gen2 do cluster do Apache Flink no HDInsight no AKS em tabelas Delta Lake usando o Carregador Automático do Azure Databricks.

Pré-requisitos

Carregador Automático do Azure Databricks

O Carregador Automático do Databricks facilita o fluxo de dados para o armazenamento de objetos de aplicativos Flink para tabelas delta lake. O Carregador Automático fornece uma fonte de Fluxo estruturado chamada cloudFiles.

Aqui estão as etapas que explicam como você pode usar dados do Flink em tabelas dinâmicas delta do Azure Databricks.

Nessa etapa, você pode criar a tabela Kafka e o ADLS Gen2 no Flink SQL. Neste documento, estamos usando um airplanes_state_real_time table. Você pode usar qualquer artigo de sua escolha.

Você precisa atualizar os IPs do agente com o cluster do Kafka no trecho de código.

CREATE TABLE kafka_airplanes_state_real_time (
   `date` STRING,
   `geo_altitude` FLOAT,
   `icao24` STRING,
   `latitude` FLOAT,
   `true_track` FLOAT,
   `velocity` FLOAT,
   `spi` BOOLEAN,
   `origin_country` STRING,
   `minute` STRING,
   `squawk` STRING,
   `sensors` STRING,
   `hour` STRING,
   `baro_altitude` FLOAT,
   `time_position` BIGINT,
   `last_contact` BIGINT,
   `callsign` STRING,
   `event_time` STRING,
   `on_ground` BOOLEAN,
   `category` STRING,
   `vertical_rate` FLOAT,
   `position_source` INT,
   `current_time` STRING,
   `longitude` FLOAT
 ) WITH (
    'connector' = 'kafka',  
    'topic' = 'airplanes_state_real_time',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092', 
    'format' = 'json' 
);

Em seguida, você pode criar a tabela ADLSgen2 no Flink SQL.

Atualize o nome do contêiner e o nome da conta de armazenamento no trecho código com os detalhes do ADLS Gen2.

CREATE TABLE adlsgen2_airplanes_state_real_time (
  `date` STRING,
  `geo_altitude` FLOAT,
  `icao24` STRING,
  `latitude` FLOAT,
  `true_track` FLOAT,
  `velocity` FLOAT,
  `spi` BOOLEAN,
  `origin_country` STRING,
  `minute` STRING,
  `squawk` STRING,
  `sensors` STRING,
  `hour` STRING,
  `baro_altitude` FLOAT,
  `time_position` BIGINT,
  `last_contact` BIGINT,
  `callsign` STRING,
  `event_time` STRING,
  `on_ground` BOOLEAN,
  `category` STRING,
  `vertical_rate` FLOAT,
  `position_source` INT,
  `current_time` STRING,
  `longitude` FLOAT
) WITH (
    'connector' = 'filesystem',
    'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
    'format' = 'json'
);

Além disso, você pode inserir a tabela Kafka na tabela ADLSgen2 no SQL do Flink.

A captura de tela mostra a inserção da tabela Kafka na tabela ADLSgen2.

A captura de tela mostra a validação do trabalho de streaming no Flink.

Verificar o coletor de dados do Kafka no Armazenamento do Azure no portal do Azure

A captura de tela mostra o coletor de dados de verificação do Kafka no Armazenamento do Azure.

Autenticação do Armazenamento do Azure e do notebook do Azure Databricks

O ADLS Gen2 fornece o OAuth 2.0 com sua entidade de serviço de aplicativo do Microsoft Entra para autenticação de um notebook do Azure Databricks e, em seguida, a montagem no Azure Databricks DBFS.

Vamos obter a appid da entidade de serviço, a ID do locatário e a chave secreta.

A captura de tela mostra a appid da entidade de serviço, a ID do locatário e a chave secreta.

Conceder princípio de serviço ao Proprietário de Dados de Blob de Armazenamento no portal do Azure

A captura de tela mostra a entidade de serviço do Proprietário de Dados de Blob de Armazenamento no portal do Azure.

Montar o ADLS Gen2 no DBFS, no notebook do Azure Databricks

A captura de tela mostra a montagem do ADLS Gen2 no DBFS, no notebook do Azure Databricks.

Preparar bloco de anotações

Vamos escrever o seguinte código:

%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")

Definir o Pipeline de Tabela Dinâmica Delta e executar no Azure Databricks

Definir o Pipeline de Tabela Dinâmica Delta e a execução no Azure Databricks.

A captura de tela mostra o Pipeline da Tabela Dinâmica Delta e a execução no Azure Databricks.

Verificar a Tabela Dinâmica Delta no Notebook do Azure Databricks

A captura de tela mostra a verificação da Tabela Dinâmica Delta no Notebook do Azure Databricks.

Referência