Panoramica di Linux Foundation Delta Lake

Questo articolo è stato adattato rispetto alla controparte originale disponibile qui per offrire maggiore chiarezza. Questo articolo consente di esplorare rapidamente le funzionalità principali di Delta Lake. L'articolo contiene frammenti di codice che illustrano come eseguire la lettura e la scrittura in tabelle Delta Lake da query interattive, in batch e di streaming. I frammenti di codice sono disponibili anche in un set di notebook PySpark, Scala e C#.

Verranno illustrati gli argomenti seguenti:

  • Crea una tabella
  • Leggere i dati
  • Aggiornare i dati della tabella
  • Sovrascrivere i dati della tabella
  • Eseguire l'aggiornamento condizionale senza sovrascrittura
  • Leggere versioni precedenti dei dati con lo spostamento cronologico
  • Scrivere un flusso di dati in una tabella
  • Leggere un flusso di modifiche da una tabella
  • Supporto di SQL

Impostazione

Assicurarsi di modificare quanto segue in base all'ambiente in uso.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

Il risultato è il seguente:

'/delta/delta-table-335323'

Crea una tabella

Per creare una tabella Delta Lake, scrivere un dataframe in formato delta. È possibile modificare il formato da Parquet, CSV, JSON e così via in delta.

Il codice seguente illustra come creare una nuova tabella Delta Lake usando lo schema dedotto dal dataframe.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

Il risultato è il seguente:

ID
0
1
2
3
4

Leggere i dati

Per leggere i dati nella tabella Delta Lake, specificare il percorso dei file e il formato delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

Il risultato è il seguente:

ID
1
3
4
0
2

L'ordine dei risultati è diverso rispetto ai precedenti perché non è stato specificato esplicitamente alcun ordine prima della restituzione dei risultati.

Aggiornare i dati della tabella

Delta Lake supporta diverse operazioni per modificare le tabelle usando le API DataFrame standard. Queste operazioni sono uno dei miglioramenti aggiunti dal formato delta. Questo esempio esegue un processo batch per sovrascrivere i dati nella tabella.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

Il risultato è il seguente:

ID
7
8
5
9
6

Come si può osservare, tutti e cinque i record sono stati aggiornati con nuovi valori.

Eseguire il salvataggio come tabelle di catalogo

Delta Lake consente la scrittura in tabelle di catalogo gestite o esterne.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

Il risultato è il seguente:

database tableName isTemporary
impostazione predefinita externaldeltatable false
impostazione predefinita manageddeltatable false

Con questo codice si è creata una nuova tabella nel catalogo da un dataframe esistente. Tale tabella è detta tabella gestita. Si è quindi definita una nuova tabella esterna nel catalogo che usa una posizione esistente. Tale tabella è detta tabella esterna. Nell'output si può osservare che nel catalogo sono elencate entrambe le tabelle, indipendentemente dal modo in cui sono state create.

È ora possibile esaminare le proprietà estese di entrambe le tabelle.

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

Il risultato è il seguente:

col_name data_type commento
id bigint Null
Detailed Table Information
Database impostazione predefinita
Tabella manageddeltatable
Proprietario trusted-service-user
Data e ora creazione Sat Apr 25 00:35:34 UTC 2020
Last Access Thu Jan 01 00:00:00 UTC 1970
Creato da Spark 2.4.4.2.6.99.201-11401300
Type MANAGED
Provider delta
Table Properties [transient_lastDdlTime=1587774934]
Statistiche 2407 bytes
Ufficio abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<nome dell’area di lavoro>/warehouse/manageddeltatable
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

Il risultato è il seguente:

col_name data_type commento
id bigint Null
Detailed Table Information
Database impostazione predefinita
Tabella externaldeltatable
Proprietario trusted-service-user
Data e ora creazione Sat Apr 25 00:35:38 UTC 2020
Last Access Thu Jan 01 00:00:00 UTC 1970
Creato da Spark 2.4.4.2.6.99.201-11401300
Type EXTERNAL
Provider DELTA
Table Properties [transient_lastDdlTime=1587774938]
Ufficio abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties [serialization.format=1]

Eseguire l'aggiornamento condizionale senza sovrascrittura

Delta Lake offre API programmatiche per eseguire operazioni di aggiornamento, eliminazione e unione (questo comando è comunemente definito upsert) condizionale dei dati nelle tabelle.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

Il risultato è il seguente:

ID
106
108
5
7
9

In questo caso si è semplicemente aggiunto 100 a ogni ID pari.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

Il risultato è il seguente:

ID
5
7
9

Si noti che sono state eliminate tutte le righe pari.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

Il risultato è il seguente:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

In questo caso si è ottenuta una combinazione dei dati esistenti. Ai dati esistenti è stato assegnato il valore -1 nel percorso del codice di aggiornamento (WhenMatched) e sono stati aggiunti anche i nuovi dati precedentemente creati nella parte superiore del frammento e aggiunti tramite il percorso del codice di inserimento (WhenNotMatched).

Cronologia

Delta Lake offre la possibilità di esaminare la cronologia di una tabella, ossia le modifiche apportate alla tabella Delta sottostante. La cella seguente illustra quanto sia semplice controllare la cronologia.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

Il risultato è il seguente:

versione timestamp userId userName operation (operazione) operationParameters processo notebook clusterId readVersion isolationLevel isBlindAppend
4 2020-04-25 00:36:27 Null Null MERGE [predicate -> (oldData.ID = newData.ID)] Null Null Null 3 Null false
3 2020-04-25 00:36:08 Null Null DELETE [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] Null Null Null 2 Null false
2 2020-04-25 00:35:51 Null Null UPDATE [predicate -> ((UID#744L % cast(2 as bigint)) = cast(0 as bigint))] Null Null Null 1 Null false
1 2020-04-25 00:35:05 Null Null WRITE [mode -> Overwrite, partitionBy -> []] Null Null Null 0 Null false
0 2020-04-25 00:34:34 Null Null WRITE [mode -> ErrorIfExists, partitionBy -> []] Null Null Null Null Null true

È così possibile visualizzare tutte le modifiche apportate nei frammenti di codice precedenti.

Leggere versioni precedenti dei dati con lo spostamento cronologico

È possibile eseguire query su snapshot precedenti della tabella Delta Lake usando una funzionalità denominata spostamento cronologico. Se si vuole accedere ai dati sovrascritti, si può eseguire una query su uno snapshot della tabella precedente alla sovrascrittura del primo set di dati con l'opzione versionAsOf.

Dopo l'esecuzione della cella seguente verrà visualizzato il primo set di dati nella versione precedente alla sovrascrittura. Lo spostamento cronologico è una funzionalità avanzata che sfrutta la potenza del log delle transazioni Delta Lake per accedere ai dati che non sono più presenti nella tabella. Rimuovendo l'opzione della versione 0 (o specificando la versione 1) verranno visualizzati nuovamente i dati più recenti. Per altre informazioni, vedere Eseguire query su uno snapshot precedente di una tabella.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

Il risultato è il seguente:

ID
0
1
4
3
2

Come si può osservare, si è tornati alla prima versione dei dati.

Scrivere un flusso di dati in una tabella

È anche possibile scrivere in una tabella Delta Lake usando lo streaming strutturato di Spark. Il log delle transazioni Delta Lake garantisce che l'elaborazione venga effettuata esattamente una volta, anche in presenza di altri flussi o query in batch eseguite simultaneamente sulla tabella. Per impostazione predefinita, i flussi vengono eseguiti in modalità Append, aggiungendo nuovi record alla tabella.

Per altre informazioni sull'integrazione di Delta Lake con lo streaming strutturato, vedere Letture e scritture di tabelle in streaming.

Nelle celle seguenti vengono effettuate queste operazioni:

  • La cella 30 mostra i dati appena aggiunti in modalità Append.
  • La cella 31 controlla la cronologia.
  • La cella 32 arresta il processo di streaming strutturato.
  • La cella 33 controlla la cronologia <-- Si noterà che le aggiunte sono state interrotte

Per prima cosa, si configurerà un semplice processo di streaming Spark per generare una sequenza ed eseguire la scrittura nella tabella Delta.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Leggere un flusso di modifiche da una tabella

Mentre il flusso esegue la scrittura nella tabella Delta Lake, è anche possibile leggere da tale tabella come origine del flusso. Ad esempio, si può avviare un'altra query di streaming che stampa tutte le modifiche apportate alla tabella Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

Il risultato è il seguente:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

Il risultato è il seguente:

versione timestamp operation (operazione) operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predicate -> ["((id CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] Null

In questo caso si rimuovono alcune delle colonne meno interessanti per semplificare l'esperienza di visualizzazione della cronologia.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

Il risultato è il seguente:

versione timestamp operation (operazione) operationParameters readVersion
5 2020-04-25 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 2020-04-25 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 2020-04-25 00:36:08 DELETE [predicate -> ["((id CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 2020-04-25 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 2020-04-25 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 2020-04-25 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] Null

Eseguire la conversione da Parquet a Delta

È possibile eseguire una conversione sul posto dal formato Parquet a Delta.

Il codice seguente consente di verificare se la tabella esistente è in formato delta o meno.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

Il risultato è il seguente:

Falso

Si convertiranno quindi i dati nel formato delta e si verificherà se l'operazione è riuscita.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Il risultato è il seguente:

Vero

Supporto di SQL

Delta supporta comandi di utilità per tabelle tramite SQL. È possibile usare SQL per:

  • Ottenere la cronologia di una tabella Delta
  • Eseguire un'operazione vacuum su una tabella Delta
  • Convertire un file Parquet in Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

Il risultato è il seguente:

versione timestamp userId userName operation (operazione) operationParameters processo notebook clusterId readVersion isolationLevel isBlindAppend
5 2020-04-25 00:37:09 Null Null STREAMING UPDATE [outputMode -> Ap... Null Null Null 4 Null true
4 2020-04-25 00:36:27 Null Null MERGE [predicate -> (ol... Null Null Null 3 Null false
3 2020-04-25 00:36:08 Null Null DELETE [predicate -> ["(... Null Null Null 2 Null false
2 2020-04-25 00:35:51 Null Null UPDATE [predicate -> ((i... Null Null Null 1 Null false
1 2020-04-25 00:35:05 Null Null WRITE [mode -> Overwrit... Null Null Null 0 Null false
0 2020-04-25 00:34:34 Null Null WRITE [mode -> ErrorIfE... Null Null Null Null Null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

Il risultato è il seguente:

path
abfss://data@arca...

Ora si verificherà che una tabella non sia nel formato delta. Si convertirà quindi la tabella in formato delta usando Spark SQL e si verifica che sia stata convertita correttamente.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

Il risultato è il seguente:

Vero

Per la documentazione completa, vedere la pagina della documentazione di Delta Lake.

Per altre informazioni, vedere il progetto Delta Lake.

Passaggi successivi