Tutorial: Delta Lake

In diesem Tutorial werden allgemeine Delta Lake-Vorgänge in Azure Databricks vorgestellt, einschließlich der folgenden:

Sie können den Python-, Scala- und SQL-Beispielcode in diesem Artikel in einem Notebook ausführen, das an eine Computeressource von Azure Databricks, wie z. B. ein Cluster, angefügt ist. Sie können den SQL-Code in diesem Artikel auch in einer Abfrage ausführen, die einem SQL-Warehouse in Databricks SQL zugeordnet ist.

Vorbereiten der Quelldaten

Dieses Tutorial basiert auf einem Dataset namens „Personen 10 M“. Es enthält 10 Millionen fiktive Datensätze, die Angaben zu Personen enthalten, wie Vorname und Nachnamen, Geburtsdatum und Gehalt. In diesem Tutorial wird davon ausgegangen, dass sich dieses Dataset in einem Unity-Katalogvolume befindet, das Ihrem Azure Databricks-Zielarbeitsbereich zugeordnet ist.

Gehen Sie wie folgt vor, um das Dataset „Personen 10 M“ für dieses Tutorial abzurufen:

  1. Wechseln Sie zur Seite Personen 10 M in Kaggle.
  2. Klicken Sie auf Download, um eine Datei namens archive.zip auf Ihren lokalen Computer herunterzuladen.
  3. Extrahieren Sie die Datei namens export.csv aus der archive.zip-Datei. Die export.csv-Datei enthält die Daten für dieses Tutorial.

Gehen Sie wie folgt vor, um die export.csv-Datei in das Volume hochzuladen:

  1. Klicken Sie auf der Seitenleiste auf Katalog.
  2. Navigieren Sie im Katalog-Explorer zu dem Volume, in das Sie die Datei hochladen möchten, und öffnen Sie sie die export.csv-Datei.
  3. Klicken Sie auf die Schaltfläche Upload to this volume (In dieses Volume hochladen).
  4. Ziehen sie die export.csv-Datei und legen Sie sie ab, oder navigieren Sie zu der Datei auf Ihrem lokalen Computer und wählen Sie sie aus.
  5. Klicken Sie auf Hochladen.

Ersetzen Sie in den folgenden Codebeispielen /Volumes/main/default/my-volume/export.csv durch den Pfad zur Datei export.csv in Ihrem Zielvolume.

Erstellen einer Tabelle

In Azure Databricks erstellte Tabellen verwenden standardmäßig das Delta Lake-Protokoll. Databricks empfiehlt die Verwendung von verwalteten Unity Catalog-Tabellen.

Ersetzen Sie im vorherigen Codebeispiel und den folgenden Codebeispielen den Tabellennamen main.default.people_10m durch ihren dreiteiligen Zielkatalog, Schema und Tabellennamen im Unity-Katalog.

Hinweis

Delta Lake ist die Standardeinstellung für alle Lese-, Schreib- und Tabellenerstellungsbefehle für Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Die vorherigen Vorgänge erstellen eine neue verwaltete Tabelle. Informationen zu den verfügbaren Optionen beim Erstellen einer Delta-Tabelle finden Sie unter CREATE TABLE.

In Databricks Runtime 13.3 LTS und höher können Sie mit CREATE TABLE LIKE eine neue leere Delta-Tabelle erstellen, welche die Schema- und Tabelleneigenschaften einer Delta-Quelltabelle dupliziert. Dies kann besonders nützlich sein, wenn Tabellen aus einer Entwicklungsumgebung in die Produktion hochgestuft werden, wie im folgenden Codebeispiel gezeigt:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Um eine leere Tabelle zu erstellen, können Sie auch die DeltaTableBuilder -API in Delta Lake für Python und Scala verwenden. Im Vergleich zu entsprechenden DataFrameWriter-APIs erleichtern diese APIs die Angabe zusätzlicher Informationen, darunter Spaltenkommentare, Tabelleneigenschaften und generierte Spalten.

Wichtig

Dieses Feature befindet sich in der Public Preview.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert in eine Tabelle

Um eine Reihe von Updates und Einfügungen in einer vorhandene Delta-Tabelle zusammenzuführen, verwenden Sie die DeltaTable.merge-Methode für Python und Scala sowie die MERGE INTO-Anweisung für SQL. Mit der folgenden Anweisung werden beispielsweise Daten aus der Quelltabelle übernommen und mit der Delta-Zieltabelle zusammengeführt. Wenn in beiden Tabellen eine übereinstimmende Zeile vorhanden ist, wird die Datenspalte in Delta Lake mithilfe des angegebenen Ausdrucks aktualisiert. Wenn keine übereinstimmende Zeile vorhanden ist, wird in Delta Lake eine neue Zeile hinzugefügt. Dieser Vorgang wird als Upsert bezeichnet.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Wenn Sie in SQL * angeben, werden alle Spalten in der Zieltabelle aktualisiert oder eingefügt, vorausgesetzt, die Quelltabelle enthält die gleichen Spalten wie die Zieltabelle. Wenn die Zieltabelle nicht über dieselben Spalten verfügt, löst die Abfrage einen Analysefehler aus.

Beim Ausführen eines Einfügevorgangs muss für jede Spalte in der Tabelle ein Wert angegeben werden (z. B. wenn im vorhandenen Dataset keine übereinstimmende Zeile vorhanden ist). Es müssen jedoch nicht alle Werte aktualisiert werden.

Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Lesen einer Tabelle

Sie greifen auf Daten in Delta-Tabellen über den Tabellennamen oder -pfad zu, wie in den folgenden Beispielen gezeigt:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Schreiben in eine Tabelle

Delta Lake verwendet Standardsyntax zum Schreiben von Daten in Tabellen.

Verwenden Sie den Anfügemodus, wie in den folgenden Beispielen gezeigt, um einer vorhandenen Delta-Tabelle atomisch neue Daten hinzuzufügen:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Um alle Daten in einer Tabelle zu ersetzen, verwenden Sie den Überschreibmodus wie in den folgenden Beispielen:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Aktualisieren einer Tabelle

Sie können Daten aktualisieren, die einem Prädikat in einer Delta-Tabelle entsprechen. Beispielsweise können Sie in der Beispieltabelle namens people_10m eine Abkürzung in der gender-Spalte von M oder F zu Male oder Female zu ändern:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Löschen aus einer Tabelle

Sie können Daten, die einem Prädikat entsprechen, aus einer Delta-Tabelle entfernen. In der Beispieltabelle namens people_10m können Sie beispielsweise Folgendes ausführen, um alle Zeilen zu löschen, die Personen mit einem Wert in der Spalte birthDate vor 1955 entsprechen:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Wichtig

Durch das Löschen werden die Daten aus der neuesten Version der Delta-Tabelle, aber nicht aus dem physischen Speicher entfernt, bis die alten Versionen explizit abgesaugt werden. Details finden Sie unter Vacuum.

Anzeigen des Tabellenverlaufs

Um den Verlauf einer Tabelle anzuzeigen, verwenden Sie die DeltaTable.history-Methode für Python und Scala, sowie die DESCRIBE HISTORY-Anweisung in SQL, die Herkunftsinformationen bereitstellt, einschließlich der Tabellenversion, des Vorgangs, des Benutzers usw. für jeden Schreibvorgang in einer Tabelle.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Abfragen einer früheren Version der Tabelle (Zeitreise)

Mit einer Delta Lake-Zeitreise können Sie eine ältere Momentaufnahme einer Delta-Tabelle abfragen.

Um eine ältere Version einer Tabelle abzufragen, geben Sie die Version oder den Zeitstempel der Tabelle an. Wenn Sie beispielsweise Version 0 oder Zeitstempel 2024-05-15T22:43:15.000+00:00Z aus dem vorherigen Verlauf abfragen möchten, verwenden Sie Folgendes:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Als Zeitstempel werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert, z. B. "2024-05-15T22:43:15.000+00:00" oder "2024-05-15 22:43:15".

Mit den DataFrameReader-Optionen können Sie beispielsweise in Python einen DataFrame aus einer Delta-Tabelle erstellen, der auf eine bestimmte Version oder einen Zeitstempel der Tabelle festgelegt ist:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Weitere Informationen finden Sie unter Arbeiten mit dem Delta Lake-Tabellenverlauf.

Optimieren einer Tabelle

Nachdem Sie mehrere Änderungen an einer Tabelle vorgenommen haben, verfügen Sie möglicherweise über viele kleine Dateien. Zur Beschleunigung von Leseabfragen können Sie „Vorgang optimieren“ verwenden, um kleine Dateien zu größeren zusammenzufassen:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Z-Reihenfolge nach Spalten

Zur Verbesserung der Leseleistung können Sie verwandte Informationen in einer Dateiengruppe entsprechend der Z-Reihenfolge zusammenstellen. Delta Lake-Datensprungalgorithmen verwenden diese Kollokation, um die Menge der zu lesenden Daten erheblich zu reduzieren. Wenn Daten in der Z-Reihenfolge sortiert werden sollen, geben Sie die Spalten, nach denen sortiert werden soll, in der Z-Reihenfolge nach Vorgang an. Führen Sie beispielsweise zum Sortieren nach gender den folgenden Code aus:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Die vollständigen Optionen, die beim Ausführen des Optimierungsvorgangs verfügbar sind, finden Sie unter Optimieren des Datendateilayouts.

Bereinigen von Momentaufnahmen VACUUM

Delta Lake ermöglicht die Momentaufnahmenisolation für Lesevorgänge, was bedeutet, dass ein Optimierungsvorgang auch dann sicher ausgeführt werden kann, wenn die Tabelle von anderen Benutzern oder Aufträgen abgefragt wird. Irgendwann sollten Sie alte Momentaufnahmen jedoch bereinigen. Sie können dies tun, indem Sie den Vakuumbetrieb ausführen:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Ausführliche Informationen zur effektiven Verwendung des Vakuumbetriebs finden Sie unter Entfernen nicht verwendeter Datendateien mit Vakuum.