Aggiornare lo schema della tabella Delta Lake
Delta Lake consente di aggiornare lo schema di una tabella. Sono supportati i tipi seguenti di modifiche:
- Aggiunta di nuove colonne (in posizioni arbitrarie)
- Riordinare le colonne esistenti
- Ridenominazione delle colonne esistenti
È possibile apportare queste modifiche in modo esplicito usando DDL o in modo implicito usando DML.
Importante
Un aggiornamento di uno schema di tabella Delta è un'operazione in conflitto con tutte le operazioni di scrittura Delta simultanee.
Quando si aggiorna uno schema della tabella Delta, i flussi letti da tale tabella terminano. Se si vuole che lo streaming continui, è necessario riavviarlo. Per i metodi consigliati, vedere Considerazioni sulla produzione per Structured Streaming.
Aggiornare in modo esplicito lo schema per aggiungere colonne
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Per impostazione predefinita, il supporto dei valori Null è true
.
Per aggiungere una colonna a un campo annidato, usare:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Ad esempio, se lo schema prima dell'esecuzione ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
è:
- root
| - colA
| - colB
| +-field1
| +-field2
lo schema dopo è:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Nota
L'aggiunta di colonne annidate è supportata solo per gli struct. Le matrici e le mappe non sono supportate.
Aggiornare in modo esplicito lo schema per modificare il commento o l'ordinamento delle colonne
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Per modificare una colonna in un campo annidato, usare:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Ad esempio, se lo schema prima dell'esecuzione ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
è:
- root
| - colA
| - colB
| +-field1
| +-field2
lo schema dopo è:
- root
| - colA
| - colB
| +-field2
| +-field1
Aggiornare in modo esplicito lo schema per sostituire le colonne
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Ad esempio, quando si esegue il DDL seguente:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
se lo schema prima è:
- root
| - colA
| - colB
| +-field1
| +-field2
lo schema dopo è:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Aggiornare in modo esplicito lo schema per rinominare le colonne
Nota
Questa funzionalità è disponibile in Databricks Runtime 10.4 LTS e versioni successive.
Per rinominare le colonne senza riscrivere i dati esistenti delle colonne, è necessario abilitare il mapping delle colonne per la tabella. Vedere Rinominare ed eliminare colonne con mapping di colonne Delta Lake.
Per rinominare una colonna:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Per rinominare un campo annidato:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Ad esempio, quando si esegue il comando seguente:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Se lo schema prima è:
- root
| - colA
| - colB
| +-field1
| +-field2
Lo schema dopo è quindi:
- root
| - colA
| - colB
| +-field001
| +-field2
Vedere Rinominare ed eliminare colonne con mapping di colonne Delta Lake.
Aggiornare in modo esplicito lo schema per eliminare le colonne
Nota
Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive.
Per eliminare le colonne come operazione di sola metadati senza riscrivere alcun file di dati, è necessario abilitare il mapping delle colonne per la tabella. Vedere Rinominare ed eliminare colonne con mapping di colonne Delta Lake.
Importante
L'eliminazione di una colonna dai metadati non comporta l'eliminazione dei dati sottostanti per la colonna nei file. Per eliminare i dati delle colonne eliminate, è possibile usare REORG TABLE per riscrivere i file. È quindi possibile usare VACUUM per eliminare fisicamente i file che contengono i dati delle colonne eliminate.
Per eliminare una colonna:
ALTER TABLE table_name DROP COLUMN col_name
Per eliminare più colonne:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Aggiornare in modo esplicito lo schema per modificare il tipo di colonna o il nome
È possibile modificare il tipo o il nome di una colonna o eliminare una colonna riscrivendo la tabella. A tale scopo, usare l'opzione overwriteSchema
.
L'esempio seguente illustra la modifica di un tipo di colonna:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
L'esempio seguente mostra la modifica di un nome di colonna:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Abilitare l'evoluzione dello schema
È possibile abilitare l'evoluzione dello schema eseguendo una delle operazioni seguenti:
- Impostare su
.option("mergeSchema", "true")
un dataframewrite
owriteStream
un'operazione Spark. Vedere Abilitare l'evoluzione dello schema per le scritture per aggiungere nuove colonne. - Usare la
MERGE WITH SCHEMA EVOLUTION
sintassi. Vedere Sintassi dell'evoluzione dello schema per l'unione. - Impostare spark conf
spark.databricks.delta.schema.autoMerge.enabled
sutrue
per l'oggetto SparkSession corrente.
Databricks consiglia di abilitare l'evoluzione dello schema per ogni operazione di scrittura anziché impostare una conf Spark.
Quando si usano opzioni o sintassi per abilitare l'evoluzione dello schema in un'operazione di scrittura, questa ha la precedenza sulla conf spark.
Nota
Non esiste alcuna clausola di evoluzione dello schema per INSERT INTO
le istruzioni.
Abilitare l'evoluzione dello schema per le scritture per aggiungere nuove colonne
Le colonne presenti nella query di origine ma mancanti nella tabella di destinazione vengono aggiunte automaticamente come parte di una transazione di scrittura quando l'evoluzione dello schema è abilitata. Vedere Abilitare l'evoluzione dello schema.
La distinzione tra maiuscole e minuscole viene mantenuta quando si aggiunge una nuova colonna. Le nuove colonne vengono aggiunte alla fine dello schema della tabella. Se le colonne aggiuntive si trovano in uno struct, vengono aggiunte alla fine dello struct nella tabella di destinazione.
L'esempio seguente illustra l'uso dell'opzione mergeSchema
con il caricatore automatico. Vedere Che cos'è il caricatore automatico?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
L'esempio seguente illustra l'uso dell'opzione mergeSchema
con un'operazione di scrittura batch:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Evoluzione automatica dello schema per l'unione delta Lake
L'evoluzione dello schema consente agli utenti di risolvere le mancate corrispondenze dello schema tra la tabella di destinazione e quella di origine in merge. Gestisce i due casi seguenti:
- Una colonna nella tabella di origine non è presente nella tabella di destinazione. La nuova colonna viene aggiunta allo schema di destinazione e i relativi valori vengono inseriti o aggiornati usando i valori di origine.
- Una colonna nella tabella di destinazione non è presente nella tabella di origine. Lo schema di destinazione rimane invariato; I valori nella colonna di destinazione aggiuntiva vengono lasciati invariati (per
UPDATE
) o impostati suNULL
(perINSERT
).
È necessario abilitare manualmente l'evoluzione automatica dello schema. Vedere Abilitare l'evoluzione dello schema.
Nota
In Databricks Runtime 12.2 LTS e versioni successive i campi colonne e struct presenti nella tabella di origine possono essere specificati in base al nome nelle azioni di inserimento o aggiornamento. In Databricks Runtime 11.3 LTS e versioni successive è possibile usare solo INSERT *
azioni o UPDATE SET *
per l'evoluzione dello schema con merge.
In Databricks Runtime 13.3 LTS e versioni successive è possibile usare l'evoluzione dello schema con struct annidati all'interno delle mappe, ad esempio map<int, struct<a: int, b: int>>
.
Sintassi dell'evoluzione dello schema per l'unione
In Databricks Runtime 15.2 e versioni successive è possibile specificare l'evoluzione dello schema in un'istruzione merge usando le API di tabella SQL o Delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Operazioni di esempio di unione con evoluzione dello schema
Ecco alcuni esempi degli effetti dell'operazione merge
con e senza evoluzione dello schema.
Colonne | Query (in SQL) | Comportamento senza evoluzione dello schema (impostazione predefinita) | Comportamento con l'evoluzione dello schema |
---|---|---|---|
Colonne di destinazione: key, value Colonne di origine: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Lo schema della tabella rimane invariato; solo le colonne key , value vengono aggiornate/inserite. |
Lo schema della tabella viene modificato in (key, value, new_value) . I record esistenti con corrispondenze vengono aggiornati con value e new_value nell'origine. Le nuove righe vengono inserite con lo schema (key, value, new_value) . |
Colonne di destinazione: key, old_value Colonne di origine: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE le azioni e INSERT generano un errore perché la colonna old_value di destinazione non si trova nell'origine. |
Lo schema della tabella viene modificato in (key, old_value, new_value) . I record esistenti con corrispondenze vengono aggiornati con nell'origine new_value lasciando old_value invariati. I nuovi record vengono inseriti con l'oggetto , new_value e NULL specificato key per .old_value |
Colonne di destinazione: key, old_value Colonne di origine: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE genera un errore perché la colonna new_value non esiste nella tabella di destinazione. |
Lo schema della tabella viene modificato in (key, old_value, new_value) . I record esistenti con corrispondenze vengono aggiornati con nell'oggetto new_value nell'origine lasciando old_value invariati e i record non corrispondenti sono NULL stati immessi per new_value . Vedere la nota (1). |
Colonne di destinazione: key, old_value Colonne di origine: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT genera un errore perché la colonna new_value non esiste nella tabella di destinazione. |
Lo schema della tabella viene modificato in (key, old_value, new_value) . I nuovi record vengono inseriti con l'oggetto , new_value e NULL specificato key per .old_value I record esistenti sono NULL stati immessi per new_value lasciare old_value invariati. Vedere la nota (1). |
(1) Questo comportamento è disponibile in Databricks Runtime 12.2 LTS e versioni successive; Databricks Runtime 11.3 LTS e sotto l'errore in questa condizione.
Escludere colonne con merge Delta Lake
In Databricks Runtime 12.2 LTS e versioni successive è possibile usare EXCEPT
le clausole nelle condizioni di merge per escludere in modo esplicito le colonne. Il comportamento della EXCEPT
parola chiave varia a seconda che l'evoluzione dello schema sia abilitata o meno.
Con l'evoluzione dello schema disabilitata, la EXCEPT
parola chiave si applica all'elenco di colonne nella tabella di destinazione e consente di escludere colonne da UPDATE
o INSERT
azioni. Le colonne escluse sono impostate su null
.
Con l'evoluzione dello schema abilitata, la EXCEPT
parola chiave si applica all'elenco di colonne nella tabella di origine e consente di escludere colonne dall'evoluzione dello schema. Una nuova colonna nell'origine non presente nella destinazione non viene aggiunta allo schema di destinazione se è elencata nella EXCEPT
clausola . Le colonne escluse già presenti nella destinazione sono impostate su null
.
Gli esempi seguenti illustrano questa sintassi:
Colonne | Query (in SQL) | Comportamento senza evoluzione dello schema (impostazione predefinita) | Comportamento con l'evoluzione dello schema |
---|---|---|---|
Colonne di destinazione: id, title, last_updated Colonne di origine: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Le nuove righe vengono inserite usando i valori per id e title . Il campo last_updated escluso è impostato su null . Il campo review viene ignorato perché non si trova nella destinazione. |
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Lo schema si è evoluto per aggiungere il campo review . Le nuove righe vengono inserite usando tutti i campi di origine, ad eccezione last_updated del quale è impostato su null . |
Colonne di destinazione: id, title, last_updated Colonne di origine: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT genera un errore perché la colonna internal_count non esiste nella tabella di destinazione. |
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Il review campo viene aggiunto alla tabella di destinazione, ma il internal_count campo viene ignorato. Le nuove righe inserite sono last_updated impostate su null . |
NullType
Gestione delle colonne negli aggiornamenti dello schema
Poiché Parquet non supporta NullType
, NullType
le colonne vengono eliminate dal dataframe durante la scrittura in tabelle Delta, ma vengono comunque archiviate nello schema. Quando viene ricevuto un tipo di dati diverso per tale colonna, Delta Lake unisce lo schema al nuovo tipo di dati. Se Delta Lake riceve un oggetto NullType
per una colonna esistente, lo schema precedente viene mantenuto e la nuova colonna viene eliminata durante la scrittura.
NullType
in streaming non è supportato. Poiché è necessario impostare gli schemi quando si usa lo streaming, questo dovrebbe essere molto raro. NullType
non è inoltre accettato per tipi complessi, ad ArrayType
esempio e MapType
.
Sostituire lo schema delle tabelle
Per impostazione predefinita, la sovrascrittura dei dati in una tabella non sovrascrive lo schema. Quando si sovrascrive una tabella usando mode("overwrite")
senza replaceWhere
, è comunque possibile sovrascrivere lo schema dei dati scritti. Sostituire lo schema e il partizionamento della tabella impostando l'opzione overwriteSchema
su true
:
df.write.option("overwriteSchema", "true")
Importante
Non è possibile specificare overwriteSchema
come true
quando si usa la sovrascrittura della partizione dinamica.