教學課程:Delta Lake

本教學課程介紹 Azure Databricks 上的常見 Delta Lake 作業,包括下列各項:

您可以從連結至 Azure Databricks 計算資源的筆記本內,執行本文中的範例 Python、Scala 和 SQL 程式代碼,例如叢集。 您也可以從 Databricks SQL 中與 SQL 倉儲相關聯的查詢執行本文中的 SQL 程式代碼。

準備源數據

本教學課程依賴名為People 10 M的數據集。它包含 1000 萬筆虛構記錄,這些記錄保存著有關人們的事實,例如名字和姓氏、出生日期和工資。 本教學課程假設此數據集位於與您的目標 Azure Databricks 工作區相關聯的 Unity 目錄 磁碟 區中。

若要取得本教學課程的People 10 M數據集,請執行下列動作:

  1. 移至 Kaggle 中的 [人員 10 M ] 頁面。
  2. 按兩下 [ 下載 ] 以將名為 archive.zip 的檔案下載到本機電腦。
  3. 從檔案擷取名為 export.csvarchive.zip 檔案。 檔案 export.csv 包含本教學課程的數據。

若要將 export.csv 檔案上傳至磁碟區,請執行下列動作:

  1. 在提要欄中,按兩下 [目錄]。
  2. [目錄總管] 中,流覽至並開啟您要上傳檔案的 export.csv 磁碟區。
  3. 按兩下 [ 上傳至此磁碟區]。
  4. 拖放,或流覽至本機計算機上的檔案並加以選取 export.csv
  5. 按一下 [上傳] 。

在下列程式代碼範例中,將 取代 /Volumes/main/default/my-volume/export.csv 為您目標磁碟區中的檔案路徑 export.csv

建立數據表

根據預設,在 Azure Databricks 上建立的所有數據表都會使用 Delta Lake。 Databricks 建議使用 Unity 目錄受控數據表。

在上述程式代碼範例和下列程式代碼範例中,將數據表名稱 main.default.people_10m 取代為 Unity 目錄中的目標三部分目錄、架構和數據表名稱。

注意

Delta Lake 是 Azure Databricks 的所有讀取、寫入和數據表建立命令的預設值。

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

上述作業會建立新的Managed數據表。 如需建立 Delta 數據表時可用選項的相關信息,請參閱 CREATE TABLE

在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用 CREATE TABLE LIKE 建立新的空白 Delta 數據表,以複製來源 Delta 數據表的架構和數據表屬性。 這在將數據表從開發環境升級為生產環境時特別有用,如下列程式代碼範例所示:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

若要建立空的數據表,您也可以在 Delta Lake for PythonScala 中使用 DeltaTableBuilder API。 相較於對等的 DataFrameWriter API,這些 API 可讓您更輕鬆地指定其他資訊,例如數據行批註、數據表屬性和 產生的數據行

重要

這項功能處於公開預覽狀態

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

向上插入數據表

若要將一組更新和插入合併至現有的 Delta 數據表,您可以使用 Python 和 Scala 的 方法,以及 SQL 的 MERGE INTO 語句。 DeltaTable.merge 例如,下列範例會從源數據表取得數據,並將它合併至目標 Delta 數據表。 當這兩個數據表中有相符的數據列時,Delta Lake 會使用指定的表達式來更新數據行。 當沒有相符的數據列時,Delta Lake 會加入新的數據列。 這項作業稱為 upsert

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

在 SQL 中,如果您指定 *,這會更新或插入目標數據表中的所有資料行,假設源數據表的數據行與目標數據表相同。 如果目標數據表沒有相同的數據行,查詢會擲回分析錯誤。

當您執行插入作業時,您必須為資料表中的每個資料行指定值(例如,當現有數據集中沒有相符的數據列時)。 不過,您不需要更新所有值。

若要查看結果,請查詢數據表。

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

讀取數據表

您可以依資料表名稱或資料表路徑存取 Delta 資料表中的數據,如下列範例所示:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

寫入數據表

Delta Lake 使用標準語法將數據寫入數據表。

若要以不可部分完成的方式將新數據新增至現有的 Delta 數據表,請使用附加模式,如下列範例所示:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

若要取代數據表中的所有數據,請使用覆寫模式,如下列範例所示:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

更新數據表

您可以更新符合 Delta 數據表中述詞的數據。 例如,在範例people_10m數據表中,若要從 或變更 或 F 資料行M中的gender縮寫,MaleFemale您可以執行下列命令:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

從資料表中刪除

您可以從 Delta 數據表移除符合述詞的數據。 例如,在範例 people_10m 數據表中,若要刪除與資料行中 birthDate 值相對應的所有資料列 1955,您可以執行下列命令:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

重要

刪除會從最新版的 Delta 數據表中移除數據,但在明確清除舊版之前,不會從實體記憶體中移除它。 如需詳細資訊,請參閱 真空

顯示數據表歷程記錄

若要檢視數據表的歷程記錄,您可以使用 DeltaTable.history PythonScala 的方法,以及 SQL 中的 DESCRIBE HISTORY 語句,其會提供數據表版本、作業、使用者等,以針對每個寫入數據表提供證明資訊。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

查詢舊版的資料表 (時程移動)

Delta Lake 時間移動可讓您查詢差異數據表的較舊快照集。

若要查詢舊版的數據表,請指定數據表的版本或時間戳。 例如,若要從上述歷程記錄查詢第0版或時間戳 2024-05-15T22:43:15.000+00:00Z ,請使用下列專案:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

對於時間戳,只接受日期或時間戳字串,例如 "2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15"

DataFrameReader 選項可讓您從已修正為數據表特定版本或時間戳的 Delta 數據表建立 DataFrame,例如:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

如需詳細資訊,請參閱 使用 Delta Lake 數據表歷程記錄

優化數據表

對數據表執行多個變更之後,您可能有許多小型檔案。 若要改善讀取查詢的速度,您可以使用優化作業將小型檔案折疊成較大的檔案:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

依數據行排序 Z 順序

若要進一步改善讀取效能,您可以依 z 順序將同一組檔案中的相關信息共置。 Delta Lake 數據略過演算法會使用此組合來大幅減少需要讀取的數據量。 若為迭置順序數據,您可以指定要依作業以迭置順序排序的數據行。 例如,若要由 gender共置 ,請執行:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

如需執行優化作業時可用的一組完整選項,請參閱 優化數據檔配置

使用清除快照集 VACUUM

Delta Lake 提供讀取的快照集隔離,這表示即使其他使用者或作業正在查詢數據表,也能安全地執行優化作業。 不過,您最終應該清除舊的快照集。 您可以執行真空作業來執行這項操作:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

如需有效使用真空作業的詳細資訊,請參閱 使用真空移除未使用的數據檔。