Esercitazione: Delta Lake

Questa esercitazione introduce le operazioni Delta Lake comuni in Azure Databricks, incluse le seguenti:

È possibile eseguire l'esempio di codice Python, Scala e SQL in questo articolo da un notebook collegato a una risorsa di calcolo di Azure Databricks, ad esempio un cluster. È anche possibile eseguire il codice SQL in questo articolo dall'interno di una query associata a sql warehouse in Databricks SQL.

Preparare i dati di origine

Questa esercitazione si basa su un set di dati denominato People 10 M. Contiene 10 milioni di record fittizi che contengono fatti su persone, come nome e cognome, data di nascita e stipendio. Questa esercitazione presuppone che questo set di dati si trova in un volume di Catalogo Unity associato all'area di lavoro di Azure Databricks di destinazione.

Per ottenere il set di dati People 10 M per questa esercitazione, eseguire le operazioni seguenti:

  1. Vai alla pagina Persone 10 M in Kaggle.
  2. Fare clic su Scarica per scaricare un file denominato archive.zip nel computer locale.
  3. Estrarre il file denominato export.csv dal archive.zip file. Il export.csv file contiene i dati per questa esercitazione.

Per caricare il export.csv file nel volume, eseguire le operazioni seguenti:

  1. Sulla barra laterale fare clic su Catalogo.
  2. In Esplora cataloghi passare e aprire il volume in cui si vuole caricare il export.csv file.
  3. Fare clic su Carica in questo volume.
  4. Trascinare e rilasciare o passare a e selezionare il file nel export.csv computer locale.
  5. Fare clic su Carica.

Negli esempi di codice seguenti sostituire /Volumes/main/default/my-volume/export.csv con il percorso del export.csv file nel volume di destinazione.

Creare una tabella

Per impostazione predefinita, tutte le tabelle create in Azure Databricks usano Delta Lake. Databricks consiglia di usare le tabelle gestite di Unity Catalog.

Nell'esempio di codice precedente e negli esempi di codice seguenti sostituire il nome main.default.people_10m della tabella con il catalogo, lo schema e il nome della tabella di destinazione nel catalogo unity.

Nota

Delta Lake è l'impostazione predefinita per tutti i comandi di lettura, scrittura e creazione di tabelle in Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Le operazioni precedenti creano una nuova tabella gestita. Per informazioni sulle opzioni disponibili quando si crea una tabella Delta, vedere CREATE TABLE.

In Databricks Runtime 13.3 LTS e versioni successive è possibile usare CREATE TABLE LIKE per creare una nuova tabella Delta vuota che duplica lo schema e le proprietà della tabella per una tabella Delta di origine. Ciò può essere particolarmente utile quando si promuovono tabelle da un ambiente di sviluppo all'ambiente di produzione, come illustrato nell'esempio di codice seguente:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Per creare una tabella vuota, è anche possibile usare l'API DeltaTableBuilder in Delta Lake per Python e Scala. Rispetto alle API DataFrameWriter equivalenti, queste API semplificano la specifica di informazioni aggiuntive, ad esempio commenti di colonna, proprietà di tabella e colonne generate.

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert in una tabella

Per unire un set di aggiornamenti e inserimenti in una tabella Delta esistente, usare il DeltaTable.merge metodo per Python e Scala e l'istruzione MERGE INTO per SQL. L'esempio seguente, ad esempio, accetta i dati dalla tabella di origine e lo unisce alla tabella Delta di destinazione. Quando è presente una riga corrispondente in entrambe le tabelle, Delta Lake aggiorna la colonna di dati usando l'espressione specificata. Quando non è presente alcuna riga corrispondente, Delta Lake aggiunge una nuova riga. Questa operazione è nota come upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

In SQL, se si specifica *, questo aggiorna o inserisce tutte le colonne nella tabella di destinazione, presupponendo che la tabella di origine abbia le stesse colonne della tabella di destinazione. Se la tabella di destinazione non ha le stesse colonne, la query genera un errore di analisi.

È necessario specificare un valore per ogni colonna della tabella quando si esegue un'operazione di inserimento, ad esempio quando non è presente alcuna riga corrispondente nel set di dati esistente. Non è tuttavia necessario aggiornare tutti i valori.

Per visualizzare i risultati, eseguire una query sulla tabella.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Leggere una tabella

È possibile accedere ai dati nelle tabelle Delta in base al nome della tabella o al percorso della tabella, come illustrato negli esempi seguenti:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Scrivere in una tabella

Delta Lake usa la sintassi standard per la scrittura di dati nelle tabelle.

Per aggiungere in modo atomico nuovi dati a una tabella Delta esistente, usare la modalità di accodamento come illustrato negli esempi seguenti:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Per sostituire tutti i dati in una tabella, usare la modalità di sovrascrittura come negli esempi seguenti:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Aggiornare una tabella

È possibile aggiornare i dati corrispondenti a un predicato in una tabella Delta. Nella tabella di esempio people_10m , ad esempio, per modificare un'abbreviazione nella gender colonna da M o F a Male o Female, è possibile eseguire quanto segue:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Eliminare da una tabella

È possibile rimuovere dati corrispondenti a un predicato da una tabella Delta. Ad esempio, nella tabella di esempio people_10m , per eliminare tutte le righe corrispondenti alle persone con un valore nella birthDate colonna da prima 1955di , è possibile eseguire quanto segue:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Importante

L'eliminazione rimuove i dati dalla versione più recente della tabella Delta, ma non lo rimuove dallo spazio di archiviazione fisico fino a quando le versioni precedenti non vengono cancellate in modo esplicito. Per informazioni dettagliate, vedere vacuum .

Visualizzare la cronologia delle tabelle

Per visualizzare la cronologia di una tabella, usare il DeltaTable.history metodo per Python e Scala e l'istruzione DESCRIBE HISTORY in SQL, che fornisce informazioni sulla provenienza, tra cui la versione della tabella, l'operazione, l'utente e così via, per ogni scrittura in una tabella.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Eseguire una query su una versione precedente della tabella (tempo di spostamento)

Il viaggio in tempo delta Lake consente di eseguire query su uno snapshot precedente di una tabella Delta.

Per eseguire una query su una versione precedente di una tabella, specificare la versione o il timestamp della tabella. Ad esempio, per eseguire una query sulla versione 0 o sul timestamp 2024-05-15T22:43:15.000+00:00Z della cronologia precedente, usare quanto segue:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Per i timestamp, vengono accettate solo stringhe di data o timestamp, ad esempio "2024-05-15T22:43:15.000+00:00" o "2024-05-15 22:43:15".

Le opzioni DataFrameReader consentono di creare un dataframe da una tabella Delta fissa a una versione o un timestamp specifico della tabella, ad esempio:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Per informazioni dettagliate, vedere Usare la cronologia delle tabelle Delta Lake.

Ottimizzare una tabella

Dopo aver eseguito più modifiche a una tabella, potrebbero essere presenti molti file di piccole dimensioni. Per migliorare la velocità delle query di lettura, è possibile usare l'operazione di ottimizzazione per comprimere file di piccole dimensioni in file di dimensioni maggiori:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Ordine Z per colonne

Per migliorare ulteriormente le prestazioni di lettura, è possibile collocare le informazioni correlate nello stesso set di file ordinando z. Gli algoritmi di data-skipping di Delta Lake usano questa collocazione per ridurre drasticamente la quantità di dati che devono essere letti. Per ordinare i dati z, specificare le colonne da ordinare nell'operazione z order by. Ad esempio, per collocare da gender, eseguire:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Per il set completo di opzioni disponibili durante l'esecuzione dell'operazione di ottimizzazione, vedere Ottimizzare il layout del file di dati.

Pulire gli snapshot con VACUUM

Delta Lake offre l'isolamento dello snapshot per le letture, il che significa che è sicuro eseguire un'operazione di ottimizzazione anche se altri utenti o processi eseguono query sulla tabella. Alla fine, tuttavia, è necessario pulire gli snapshot precedenti. A tale scopo, eseguire l'operazione vacuum:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Per informazioni dettagliate sull'uso efficace dell'operazione vacuum, vedere Rimuovere i file di dati inutilizzati con vuoto.