Incorporate Apache Flink® DataStream into Azure Databricks Delta Lake Tables
This example shows how to sink stream data in Azure ADLS Gen2 from Apache Flink cluster on HDInsight on AKS into Delta Lake tables using Azure Databricks Auto Loader.
Prerequisites
- Apache Flink 1.17.0 on HDInsight on AKS
- Apache Kafka 3.2 on HDInsight
- Azure Databricks in the same virtual network as HDInsight on AKS
- ADLS Gen2 and Service Principal
Azure Databricks Auto Loader
Databricks Auto Loader makes it easy to stream data land into object storage from Flink applications into Delta Lake tables. Auto Loader provides a Structured Streaming source called cloudFiles.
Here are the steps how you can use data from Flink in Azure Databricks delta live tables.
Create Apache Kafka® table on Apache Flink® SQL
In this step, you can create Kafka table and ADLS Gen2 on Flink SQL. In this document, we're using a airplanes_state_real_time table
. You can use any article of your choice.
You need to update the broker IPs with your Kafka cluster in the code snippet.
CREATE TABLE kafka_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'airplanes_state_real_time',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '10.0.0.38:9092,10.0.0.39:9092,10.0.0.40:9092',
'format' = 'json'
);
Next, you can create ADLSgen2 table on Flink SQL.
Update the container-name and storage-account-name in the code snippet with your ADLS Gen2 details.
CREATE TABLE adlsgen2_airplanes_state_real_time (
`date` STRING,
`geo_altitude` FLOAT,
`icao24` STRING,
`latitude` FLOAT,
`true_track` FLOAT,
`velocity` FLOAT,
`spi` BOOLEAN,
`origin_country` STRING,
`minute` STRING,
`squawk` STRING,
`sensors` STRING,
`hour` STRING,
`baro_altitude` FLOAT,
`time_position` BIGINT,
`last_contact` BIGINT,
`callsign` STRING,
`event_time` STRING,
`on_ground` BOOLEAN,
`category` STRING,
`vertical_rate` FLOAT,
`position_source` INT,
`current_time` STRING,
`longitude` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/data/airplanes_state_real_time/flink/airplanes_state_real_time/',
'format' = 'json'
);
Further, you can insert Kafka table into ADLSgen2 table on Flink SQL.
Validate the streaming job on Flink
Check data sink from Kafka in Azure Storage on Azure portal
Authentication of Azure Storage and Azure Databricks notebook
ADLS Gen2 provides OAuth 2.0 with your Microsoft Entra application service principal for authentication from an Azure Databricks notebook and then mount into Azure Databricks DBFS.
Let's get service principle appid, tenant ID, and secret key.
Grant service principle the Storage Blob Data Owner on Azure portal
Mount ADLS Gen2 into DBFS, on Azure Databricks notebook
Prepare notebook
Let's write the following code:
%sql
CREATE OR REFRESH STREAMING TABLE airplanes_state_real_time2
AS SELECT * FROM cloud_files("dbfs:/mnt/contosoflinkgen2/flink/airplanes_state_real_time/", "json")
Define Delta Live Table Pipeline and run on Azure Databricks
Check Delta Live Table on Azure Databricks Notebook
Reference
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, and associated open source project names are trademarks of the Apache Software Foundation (ASF).