在 Azure Databricks 上執行您的第一個 ETL 工作負載

瞭解如何使用 Azure Databricks 的生產就緒工具,開發及部署資料協調流程的第一個擷取、轉換和載入 (ETL) 管線。

本文結束時,您會熟練掌握:

  1. 啟動 Databricks 全用途計算叢集
  2. 建立 Databricks 筆記本
  3. 使用自動載入器設定 Delta Lake 的累加式資料擷取
  4. 執行筆記本儲存格來處理、查詢和預覽資料
  5. 將筆記本排程為 Databricks 作業

本教學課程使用互動式筆記本來完成 Python 或 Scala 中的常見 ETL 工作。

您也可以使用差異即時資料表來建置 ETL 管線。 Databricks 建立了差異即時資料表,以減少建置、部署和維護生產 ETL 管線的複雜性。 請參閱教學課程:執行您的第一個差異即時資料表管線

您也可以使用 Databricks Terraform 提供者來建立本文的資源。 請參閱使用 Terraform 建立叢集、筆記本和作業

需求

注意

如果您沒有叢集控制權限,只要您可以存取叢集,就仍然可以完成下列大多數步驟。

步驟 1:建立叢集

若要進行探勘資料分析和資料工程,請建立叢集以提供執行命令所需的計算資源。

  1. 在側邊欄中按下 計算圖示 [計算]
  2. 在 [計算] 頁面上,按一下 [建立叢集]。 這會開啟 [新增叢集] 頁面。
  3. 指定叢集的唯一名稱,保留其餘值的預設狀態,然後按一下 [建立叢集]。

若要深入瞭解 Databricks 叢集,請參閱計算

步驟 2:建立 Databricks 筆記本

若要在工作區中建立筆記本,請按一下提要欄位的 新增圖示 新增 ,然後按一下 筆記本。 空白筆記本會在工作區中開啟。

若要深入瞭解如何建立並管理筆記本,請參閱 管理筆記本

步驟 3:設定自動載入器將資料內嵌至 Delta Lake

Databricks 建議使用自動載入器進行累加式資料擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。

Databricks 建議使用 Delta Lake 儲存資料。 Delta Lake 是一個開放原始碼儲存體層,可提供 ACID 交易並啟用資料湖存放庫。 Delta Lake 是在 Databricks 中建立的資料表預設格式。

若要將自動載入器設定為將資料內嵌至 Delta Lake 資料表,請將下列程式碼複製並貼到筆記本中的空白儲存格:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

注意

此程式碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式碼時,限制的網路或儲存體權限會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難排解。

若要深入了解自動載入器,請參閱什麼是自動載入器?

步驟 4:處理資料並與之互動

筆記本會逐個儲存格執行邏輯。 若要執行儲存格中的邏輯:

  1. 若要執行您在上一個步驟中完成的儲存格,請選取儲存格,然後按 SHIFT+ENTER

  2. 若要查詢您剛建立的資料表,請將下列程式碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. 若要預覽 DataFrame 中的資料,請將下列程式碼複製並貼到空白儲存格中,然後按 SHIFT+ENTER 以執行儲存格。

    Python

    display(df)
    

    Scala

    display(df)
    

若要深入瞭解可視化資料的互動式選項,請參閱 Databricks 筆記本中的視覺效果

步驟 5:排程作業

您可以透過將 Databricks 筆記本新增為 Databricks 作業中的工作,來執行 Databricks 筆記本作為生產指令碼。 在此步驟中,您將建立可手動觸發的新作業。

若要將筆記本排程為工作:

  1. 按一下標題列右側的 [排程]。
  2. 針對 [作業名稱] 輸入唯一名稱。
  3. 按一下 [手動]。
  4. 在 [叢集] 下拉式清單中,選取您在步驟 1 中建立的叢集。
  5. 按一下 [建立]。
  6. 在出現的視窗中,按一下 [立即執行]
  7. 若要查看作業執行結果,請按一下 [上次執行] 時間戳旁的外部連結圖示。

如需作業的詳細資訊,請參閱什麼是 Databricks 作業?

其他整合

深入瞭解使用 Azure Databricks 進行資料工程的整合和工具: