Eventi
Ottieni gratuitamente la certificazione in Microsoft Fabric.
19 nov, 23 - 10 dic, 23
Per un periodo di tempo limitato, il team della community di Microsoft Fabric offre buoni per esami DP-600 gratuiti.
Prepara oraQuesto browser non è più supportato.
Esegui l'aggiornamento a Microsoft Edge per sfruttare i vantaggi di funzionalità più recenti, aggiornamenti della sicurezza e supporto tecnico.
È possibile eseguire l'upsert dei dati da una tabella di origine, una vista o un dataframe in una tabella Delta di destinazione usando l'operazione MERGE
SQL. Delta Lake supporta inserimenti, aggiornamenti ed eliminazioni in MERGE
e supporta la sintassi estesa oltre gli standard SQL per facilitare i casi d'uso avanzati.
Si supponga di avere una tabella di origine denominata people10mupdates
o un percorso di origine in /tmp/delta/people-10m-updates
che contiene nuovi dati per una tabella di destinazione denominata people10m
o un percorso di destinazione in /tmp/delta/people-10m
. Alcuni di questi nuovi record potrebbero essere già presenti nei dati di destinazione. Per unire i nuovi dati, è necessario aggiornare le righe in cui è già presente la persona id
e inserire le nuove righe in cui non è presente alcuna corrispondenza id
. È possibile eseguire la query seguente:
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Importante
Solo una singola riga della tabella di origine può corrispondere a una determinata riga nella tabella di destinazione. In Databricks Runtime 16.0 e versioni successive valuta MERGE
le condizioni specificate nelle WHEN MATCHED
clausole e ON
per determinare le corrispondenze duplicate. In Databricks Runtime 15.4 LTS e versioni successive le MERGE
operazioni considerano solo le condizioni specificate nella ON
clausola .
Per informazioni dettagliate sulla sintassi scala e Python, vedere la documentazione dell'API Delta Lake. Per informazioni dettagliate sulla sintassi SQL, vedere MERGE INTO
In Databricks SQL e Databricks Runtime 12.2 LTS e versioni successive è possibile usare la WHEN NOT MATCHED BY SOURCE
clausola per UPDATE
o DELETE
i record nella tabella di destinazione che non contengono record corrispondenti nella tabella di origine. Databricks consiglia di aggiungere una clausola condizionale facoltativa per evitare di riscrivere completamente la tabella di destinazione.
Nell'esempio di codice seguente viene illustrata la sintassi di base dell'utilizzo di questo oggetto per le eliminazioni, sovrascrivendo la tabella di destinazione con il contenuto della tabella di origine ed eliminando record non corrispondenti nella tabella di destinazione. Per un modello più scalabile per le tabelle in cui gli aggiornamenti e le eliminazioni di origine sono associati al tempo, vedere Sincronizzare in modo incrementale la tabella Delta con l'origine.
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Nell'esempio seguente vengono aggiunte condizioni alla WHEN NOT MATCHED BY SOURCE
clausola e vengono specificati i valori da aggiornare in righe di destinazione non corrispondenti.
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Di seguito è riportata una descrizione dettagliata della semantica dell'operazione merge
a livello di codice.
Può essere presente un numero qualsiasi di whenMatched
clausole e whenNotMatched
.
whenMatched
le clausole vengono eseguite quando una riga di origine corrisponde a una riga della tabella di destinazione in base alla condizione di corrispondenza. Queste clausole hanno la semantica seguente.
whenMatched
le clausole possono avere al massimo una update
e un'azione delete
. L'azione update
in merge
aggiorna solo le colonne specificate (analogamente all'operazione update
) della riga di destinazione corrispondente. L'azione delete
elimina la riga corrispondente.
Ogni whenMatched
clausola può avere una condizione facoltativa. Se questa condizione di clausola esiste, l'azione update
o delete
viene eseguita per qualsiasi coppia di righe di destinazione di origine corrispondente solo quando la condizione della clausola è true.
Se sono presenti più whenMatched
clausole, vengono valutate nell'ordine in cui vengono specificate. Tutte le whenMatched
clausole, ad eccezione dell'ultima, devono avere condizioni.
Se nessuna delle condizioni restituisce true per una coppia di righe di whenMatched
origine e di destinazione corrispondente alla condizione di unione, la riga di destinazione rimane invariata.
Per aggiornare tutte le colonne della tabella Delta di destinazione con le colonne corrispondenti del set di dati di origine, usare whenMatched(...).updateAll()
. Equivale a:
whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
per tutte le colonne della tabella Delta di destinazione. Pertanto, questa azione presuppone che la tabella di origine abbia le stesse colonne della tabella di destinazione. In caso contrario, la query genera un errore di analisi.
Nota
Questo comportamento cambia quando è abilitata l'evoluzione automatica dello schema. Per informazioni dettagliate, vedere l'evoluzione automatica dello schema.
whenNotMatched
le clausole vengono eseguite quando una riga di origine non corrisponde ad alcuna riga di destinazione in base alla condizione di corrispondenza. Queste clausole hanno la semantica seguente.
whenNotMatched
le clausole possono avere solo l'azione insert
. La nuova riga viene generata in base alla colonna specificata e alle espressioni corrispondenti. Non è necessario specificare tutte le colonne nella tabella di destinazione. Per le colonne di destinazione non specificate, NULL
viene inserito .
Ogni whenNotMatched
clausola può avere una condizione facoltativa. Se la condizione della clausola è presente, viene inserita una riga di origine solo se tale condizione è true per tale riga. In caso contrario, la colonna di origine viene ignorata.
Se sono presenti più whenNotMatched
clausole, vengono valutate nell'ordine in cui vengono specificate. Tutte le whenNotMatched
clausole, ad eccezione dell'ultima, devono avere condizioni.
Per inserire tutte le colonne della tabella Delta di destinazione con le colonne corrispondenti del set di dati di origine, usare whenNotMatched(...).insertAll()
. Equivale a:
whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
per tutte le colonne della tabella Delta di destinazione. Pertanto, questa azione presuppone che la tabella di origine abbia le stesse colonne della tabella di destinazione. In caso contrario, la query genera un errore di analisi.
Nota
Questo comportamento cambia quando è abilitata l'evoluzione automatica dello schema. Per informazioni dettagliate, vedere l'evoluzione automatica dello schema.
whenNotMatchedBySource
le clausole vengono eseguite quando una riga di destinazione non corrisponde ad alcuna riga di origine in base alla condizione di merge. Queste clausole hanno la semantica seguente.
whenNotMatchedBySource
le clausole possono specificare delete
e update
azioni.whenNotMatchedBySource
clausola può avere una condizione facoltativa. Se la condizione della clausola è presente, una riga di destinazione viene modificata solo se tale condizione è true per tale riga. In caso contrario, la riga di destinazione rimane invariata.whenNotMatchedBySource
clausole, vengono valutate nell'ordine in cui vengono specificate. Tutte le whenNotMatchedBySource
clausole, ad eccezione dell'ultima, devono avere condizioni.whenNotMatchedBySource
le clausole non hanno una riga di origine da cui eseguire il pull dei valori delle colonne e quindi non è possibile fare riferimento alle colonne di origine. Per ogni colonna da modificare, è possibile specificare un valore letterale o eseguire un'azione nella colonna di destinazione, ad esempio SET target.deleted_count = target.deleted_count + 1
.Importante
merge
può non riuscire se più righe del set di dati di origine corrispondono e il merge tenta di aggiornare le stesse righe della tabella Delta di destinazione. In base alla semantica SQL di merge, tale operazione di aggiornamento è ambigua perché non è chiaro quale riga di origine deve essere usata per aggiornare la riga di destinazione corrispondente. È possibile pre-elaborare la tabella di origine per eliminare la possibilità di più corrispondenze.MERGE
in una VISTA SQL solo se la vista è stata definita come CREATE VIEW viewName AS SELECT * FROM deltaTable
.Un caso d'uso ETL comune consiste nel raccogliere i log nella tabella Delta aggiungendoli a una tabella. Tuttavia, spesso le origini possono generare record di log duplicati e i passaggi di deduplicazione downstream sono necessari per gestirli. Con merge
è possibile evitare di inserire i record duplicati.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Nota
Il set di dati contenente i nuovi log deve essere deduplicato all'interno di se stesso. Dalla semantica SQL di merge, corrisponde e deduplica i nuovi dati con i dati esistenti nella tabella, ma se sono presenti dati duplicati all'interno del nuovo set di dati, vengono inseriti. Di conseguenza, deduplicare i nuovi dati prima di eseguire l'unione nella tabella.
Se si sa che è possibile ottenere record duplicati solo per alcuni giorni, è possibile ottimizzare ulteriormente la query partizionando la tabella in base alla data e quindi specificando l'intervallo di date della tabella di destinazione su cui trovare la corrispondenza.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Questa operazione è più efficiente del comando precedente perché cerca duplicati solo negli ultimi 7 giorni di log, non nell'intera tabella. Inoltre, è possibile usare questo merge di solo inserimento con Structured Streaming per eseguire la deduplicazione continua dei log.
foreachBatch
per scrivere continuamente tutti i dati di streaming in una tabella Delta con deduplicazione. Per altre informazioni su , vedere l'esempio di streaming seguente.foreachBatch
Le tabelle live delta supportano in modo nativo il rilevamento e l'applicazione di scD type 1 e type 2. Usare APPLY CHANGES INTO
con le tabelle live Delta per assicurarsi che i record non in ordine vengano gestiti correttamente durante l'elaborazione dei feed CDC. Vedere APPLICA MODIFICHE API: semplificare il Change Data Capture con Delta Live Tables.
In Databricks SQL e Databricks Runtime 12.2 LTS e versioni successive è possibile usare WHEN NOT MATCHED BY SOURCE
per creare condizioni arbitrarie per eliminare e sostituire in modo atomico una parte di una tabella. Ciò può essere particolarmente utile quando si dispone di una tabella di origine in cui i record possono cambiare o essere eliminati per diversi giorni dopo l'immissione iniziale dei dati, ma alla fine si risolvano in uno stato finale.
La query seguente illustra l'uso di questo modello per selezionare 5 giorni di record dall'origine, aggiornare i record corrispondenti nella destinazione, inserire nuovi record dall'origine alla destinazione ed eliminare tutti i record non corrispondenti degli ultimi 5 giorni nella destinazione.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Fornendo lo stesso filtro booleano sulle tabelle di origine e di destinazione, è possibile propagare dinamicamente le modifiche dall'origine alle tabelle di destinazione, incluse le eliminazioni.
Nota
Anche se questo modello può essere usato senza clausole condizionali, ciò comporta la riscrittura completa della tabella di destinazione che può risultare costosa.
Eventi
Ottieni gratuitamente la certificazione in Microsoft Fabric.
19 nov, 23 - 10 dic, 23
Per un periodo di tempo limitato, il team della community di Microsoft Fabric offre buoni per esami DP-600 gratuiti.
Prepara ora