Rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek

Delta Live Tables zjednodušuje zachytávání dat změn (CDC) pomocí APPLY CHANGES rozhraní API a APPLY CHANGES FROM SNAPSHOT rozhraní API. Rozhraní, které používáte, závisí na zdroji změn dat:

  • Slouží APPLY CHANGES ke zpracování změn z datového kanálu změn (CDF).
  • Ke zpracování změn snímků databáze použijte APPLY CHANGES FROM SNAPSHOT (Public Preview).

Dříve se MERGE INTO tento příkaz běžně používal ke zpracování záznamů CDC v Azure Databricks. Může však MERGE INTO vést k nesprávným výsledkům z důvodu záznamů mimo posloupnosti nebo vyžaduje složitou logiku pro opakované řazení záznamů.

Rozhraní APPLY CHANGES API je podporované v rozhraních SQL a Python pro rozdílové živé tabulky. Rozhraní APPLY CHANGES FROM SNAPSHOT API je podporováno v rozhraní Pythonu Delta Live Tables.

Aktualizace APPLY CHANGES tabulek pomocí SCD typu 1 a typu 2 podporují APPLY CHANGES FROM SNAPSHOT :

  • Pomocí scd typu 1 aktualizujte záznamy přímo. Historie se neuchovává pro aktualizované záznamy.
  • Pomocí scd typu 2 zachovejte historii záznamů, a to buď u všech aktualizací, nebo aktualizací v zadané sadě sloupců.

Syntaxi a další odkazy najdete tady:

Poznámka:

Tento článek popisuje, jak aktualizovat tabulky v kanálu Delta Live Tables na základě změn ve zdrojových datech. Informace o záznamu a dotazování informací o změnách na úrovni řádků pro tabulky Delta najdete v tématu Použití datového kanálu změn Delta Lake v Azure Databricks.

Požadavky

Pokud chcete používat rozhraní API CDC, musí být váš kanál nakonfigurovaný tak, aby používal bezserverové kanály DLT nebo delta živé tabulky Pro nebo Advanced edice.

Jak je CDC implementováno s rozhraním APPLY CHANGES API?

Díky automatickému zpracování zastaralých záznamů APPLY CHANGES zajišťuje rozhraní API v rozdílových živých tabulkách správné zpracování záznamů CDC a eliminuje potřebu vyvíjet složitou logiku pro zpracování záznamů mimo posloupnosti. Ve zdrojových datech je nutné zadat sloupec, na kterém se mají sekvencovat záznamy, které Delta Live Tables interpretuje jako monotonicky rostoucí reprezentaci správného pořadí zdrojových dat. Delta Live Tables automaticky zpracovává data, která přicházejí mimo pořadí. U změn typu SCD 2 rozšíří rozdílové živé tabulky příslušné hodnoty sekvencování do sloupců a __END_AT tabulek __START_AT cíle. V každé hodnotě sekvencování by měla existovat jedna samostatná aktualizace klíče a hodnoty sekvencování NULL nejsou podporovány.

K provedení zpracování APPLY CHANGESCDC nejprve vytvoříte streamovací tabulku a pak použijete APPLY CHANGES INTO příkaz v JAZYCE SQL nebo apply_changes() funkci v Pythonu k určení zdroje, klíčů a sekvencování kanálu změn. Pokud chcete vytvořit cílovou streamovací tabulku, použijte CREATE OR REFRESH STREAMING TABLE příkaz v SQL nebo funkci v Pythonu create_streaming_table() . Podívejte se na příklady zpracování typu SCD typu 1 a typu 2.

Podrobnosti o syntaxi najdete v referenčních informacích k JAZYKu SQL delta live tables nebo pythonu.

Jak je CDC implementováno s rozhraním APPLY CHANGES FROM SNAPSHOT API?

Důležité

Rozhraní APPLY CHANGES FROM SNAPSHOT API je ve verzi Public Preview.

APPLY CHANGES FROM SNAPSHOT je deklarativní rozhraní API, které efektivně určuje změny ve zdrojových datech porovnáním řady snímků v daném pořadí a pak spustí zpracování potřebné ke zpracování záznamů ve snímcích CDC. APPLY CHANGES FROM SNAPSHOT podporuje pouze rozhraní Delta Live Tables Python.

APPLY CHANGES FROM SNAPSHOT podporuje ingestování snímků z více zdrojových typů:

  • K příjmu snímků z existující tabulky nebo zobrazení použijte pravidelné příjem dat snímků. APPLY CHANGES FROM SNAPSHOT má jednoduché zjednodušené rozhraní, které podporuje pravidelné ingestování snímků z existujícího databázového objektu. Při každé aktualizaci kanálu se ingestuje nový snímek a jako verze snímku se použije čas příjmu dat. Při spuštění kanálu v průběžném režimu se ingestuje několik snímků s každou aktualizací kanálu v období určeném nastavením intervalu triggeru pro tok, který obsahuje zpracování APPLY CHANGES FROM SNAPSHOT.
  • K zpracování souborů obsahujících snímky databáze, jako jsou snímky vygenerované z databáze Oracle nebo MySQL nebo datového skladu, použijte historický příjem snímků.

Pokud chcete provádět zpracování CDC z libovolného zdrojového typu pomocí APPLY CHANGES FROM SNAPSHOT, nejprve vytvoříte streamovací tabulku a pak pomocí apply_changes_from_snapshot() funkce v Pythonu určíte snímek, klíče a další argumenty potřebné k implementaci zpracování. Podívejte se na příklady pravidelného příjmu snímků a historického příjmu snímků.

Snímky předané rozhraní API musí být ve vzestupném pořadí podle verze. Pokud rozdílové živé tabulky zjistí snímek mimo pořadí, vyvolá se chyba.

Podrobnosti o syntaxi najdete v referenčních informacích k Rozdílové živé tabulce v Pythonu.

Omezení

Sloupec použitý pro sekvencování musí být seřazený datový typ.

Příklad: Zpracování SCD typu 1 a SCD typu 2 se zdrojovými daty CDF

Následující části obsahují příklady dotazů SCD typu 1 a 2, které aktualizují cílové tabulky na základě zdrojových událostí z datového kanálu změn, které:

  1. Vytvoří nové záznamy uživatelů.
  2. Odstraní záznam uživatele.
  3. Aktualizuje záznamy uživatelů. V příkladu SCD typu 1 se poslední UPDATE operace zpozdí a vyřadí se z cílové tabulky, což demonstruje zpracování událostí mimo pořadí.

Následující příklady předpokládají znalost konfigurace a aktualizace kanálů Delta Live Tables. Viz kurz: Spuštění prvního kanálu dynamických tabulek Delta.

Abyste mohli tyto příklady spustit, musíte začít vytvořením ukázkové datové sady. Viz Generování testovacích dat.

Tady jsou vstupní záznamy pro tyto příklady:

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 0
123 Isabel Monterrey INSERT 0
125 Mercedes Tijuana INSERT 2
126 Lilie Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Pokud odkomentujete poslední řádek v ukázkových datech, vloží se následující záznam, který určuje, kam se mají záznamy zkrátit:

userId name city operation sequenceNum
null null null ZKRÁTIT 3

Poznámka:

Všechny následující příklady zahrnují možnosti pro určení obou DELETE TRUNCATE operací, ale každý z nich je volitelný.

Zpracování aktualizací typu SCD typu 1

Následující příklad ukazuje zpracování aktualizací typu SCD 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Po spuštění příkladu typu SCD 1 obsahuje cílová tabulka následující záznamy:

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilie Cancun

Po spuštění příkladu SCD typu 1 s dalším TRUNCATE záznamem se záznamy 124 a 126 jsou zkráceny z důvodu TRUNCATE operace at sequenceNum=3a cílová tabulka obsahuje následující záznam:

userId name city
125 Mercedes Guadalajara

Zpracování aktualizací typu SCD typu 2

Následující příklad ukazuje zpracování aktualizací typu 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Po spuštění příkladu typu SCD 2 obsahuje cílová tabulka následující záznamy:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 0 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lilie Cancun 2 null

Dotaz TYPU 2 SCD může také určit podmnožinu výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Změny v jiných sloupcích se aktualizují místo generování nových záznamů historie. Následující příklad ukazuje vyloučení city sloupce ze sledování:

Následující příklad ukazuje použití historie sledování s SCD typu 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Po spuštění tohoto příkladu bez dalšího TRUNCATE záznamu obsahuje cílová tabulka následující záznamy:

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 0 null
125 Mercedes Guadalajara 2 null
126 Lilie Cancun 2 null

Generování testovacích dat

Níže uvedený kód slouží k vygenerování ukázkové datové sady pro použití v ukázkových dotazech, které jsou přítomné v tomto kurzu. Za předpokladu, že máte správné přihlašovací údaje k vytvoření nového schématu a vytvoření nové tabulky, můžete tyto příkazy spustit pomocí poznámkového bloku nebo Databricks SQL. Následující kód není určen ke spuštění jako součást kanálu Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Příklad: Pravidelné zpracování snímků

Následující příklad ukazuje SCD typ 2 zpracování, které ingestuje snímky tabulky uložené v mycatalog.myschema.mytable. Výsledky zpracování se zapisují do tabulky s názvem target.

mycatalog.myschema.mytable záznamy v časovém razítku 2024-01-01 00:00:00

Key Hodnota
0 a1
2 a2

mycatalog.myschema.mytable záznamy v časovém razítku 2024-01-01 12:00:00

Key Hodnota
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Po zpracování snímků obsahuje cílová tabulka následující záznamy:

Key Hodnota __START_AT __END_AT
0 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Příklad: Historické zpracování snímků

Následující příklad ukazuje zpracování typu SCD 2, které aktualizuje cílovou tabulku na základě zdrojových událostí ze dvou snímků uložených v systému cloudového úložiště:

Snímek v timestamp, uložený v /<PATH>/filename1.csv

Klíč TrackingColumn NonTrackingColumn
0 a1 b1
2 a2 b2
4 a4 b4

Snímek v timestamp + 5, uložený v /<PATH>/filename2.csv

Klíč TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

Následující příklad kódu ukazuje zpracování aktualizací SCD typu 2 pomocí těchto snímků:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Po zpracování snímků obsahuje cílová tabulka následující záznamy:

Klíč TrackingColumn NonTrackingColumn __START_AT __END_AT
0 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 0 null

Přidání, změna nebo odstranění dat v cílové streamovací tabulce

Pokud kanál publikuje tabulky do katalogu Unity, můžete použít příkazy jazyka DML (Data Manipulat Language ), včetně příkazů vložení, aktualizace, odstranění a sloučení, a upravit cílové tabulky streamování vytvořené příkazy APPLY CHANGES INTO .

Poznámka:

  • Příkazy DML, které upravují schéma tabulky streamované tabulky, nejsou podporovány. Ujistěte se, že se příkazy DML nepokoušnou vyvíjet schéma tabulky.
  • Příkazy DML, které aktualizují streamovací tabulku, je možné spustit pouze ve sdíleném clusteru Katalogu Unity nebo SQL Warehouse pomocí databricks Runtime 13.3 LTS a vyšší.
  • Vzhledem k tomu, že streamování vyžaduje zdroje dat jen pro připojení, pokud vaše zpracování vyžaduje streamování ze zdrojové tabulky streamování se změnami (například příkazy DML), nastavte příznak skipChangeCommits při čtení zdrojové tabulky streamování. Při skipChangeCommits nastavení se transakce, které odstraňují nebo upravují záznamy ve zdrojové tabulce, ignorují. Pokud zpracování nevyžaduje tabulku streamování, můžete jako cílovou tabulku použít materializované zobrazení (které nemá omezení pouze pro připojení).

Vzhledem k tomu, že tabulky Delta Live Tables používají zadaný SEQUENCE BY sloupec a šíří odpovídající hodnoty sekvencování do __START_AT a __END_AT sloupců cílové tabulky (pro SCD typu 2), je nutné zajistit, aby příkazy DML používaly platné hodnoty pro tyto sloupce, aby se zachovalo správné pořadí záznamů. Podívejte se , jak je CDC implementováno pomocí rozhraní API APPLY CHANGES?.

Další informace o použití příkazů DML se streamovanými tabulkami najdete v tématu Přidání, změna nebo odstranění dat v streamované tabulce.

Následující příklad vloží aktivní záznam s počáteční sekvencí 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Čtení datového APPLY CHANGES kanálu změn z cílové tabulky

Ve službě Databricks Runtime 15.2 a novějších můžete číst datový kanál změn z tabulky streamování, která je cílem APPLY CHANGES nebo APPLY CHANGES FROM SNAPSHOT dotazy stejným způsobem, jakým čtete datový kanál změn z jiných tabulek Delta. Pro čtení datového kanálu změn z cílové tabulky streamování jsou potřeba následující:

  • Cílová streamovací tabulka musí být publikovaná do katalogu Unity. Viz Použití katalogu Unity s kanály Delta Live Tables.
  • Pokud chcete číst datový kanál změn z cílové streamovací tabulky, musíte použít Databricks Runtime 15.2 nebo vyšší. Pokud chcete číst datový kanál změn v jiném kanálu Delta Live Tables, musí být kanál nakonfigurovaný tak, aby používal Databricks Runtime 15.2 nebo vyšší.

Datový kanál změn si přečtete z cílové streamovací tabulky vytvořené v kanálu Delta Live Tables stejným způsobem jako čtení datového kanálu změn z jiných tabulek Delta. Další informace o používání funkcí kanálu rozdílových změn dat, včetně příkladů v Pythonu a SQL, najdete v tématu Použití datového kanálu změn Delta Lake v Azure Databricks.

Poznámka:

Záznam datového kanálu změn obsahuje metadata identifikující typ události změny. Když se záznam aktualizuje v tabulce, metadata přidružených záznamů změn obvykle obsahují _change_type hodnoty nastavené na update_preimage a update_postimage události.

Tyto hodnoty se ale liší, pokud jsou aktualizace cílové tabulky streamování, _change_type které zahrnují změnu hodnot primárního klíče. Pokud změny zahrnují aktualizace primárních klíčů, _change_type jsou pole metadat nastavena na insert a delete události. Změny primárních klíčů můžou nastat, když se ruční aktualizace pro jedno z klíčových polí UPDATE s použitím příkazu nebo MERGE příkazu nebo v případě tabulek typu SCD 2 změní pole tak, __start_at aby odráželo dřívější počáteční sekvenci.

Dotaz APPLY CHANGES určuje hodnoty primárního klíče, které se liší pro zpracování typu SCD 1 a SCD typu 2:

  • Pro zpracování typu SCD typu 1 a rozhraní Pythonu Delta Live Tables je primárním klíčem hodnota keys parametru apply_changes() ve funkci. Pro rozhraní SQL Delta Live Tables je primárním klíčem sloupce definované klauzulí KEYS v APPLY CHANGES INTO příkazu.
  • Pro SCD typu 2 je keys primárním klíčem parametr nebo KEYS klauzule plus návratová hodnota z coalesce(__START_AT, __END_AT) operace, kde __START_AT a __END_AT jsou odpovídající sloupce z cílové streamovací tabulky.

Získání dat o záznamech zpracovaných dotazem CDC delta live tables

Poznámka:

Následující metriky jsou zachyceny pouze APPLY CHANGES dotazy, nikoli APPLY CHANGES FROM SNAPSHOT dotazy.

Dotazy zaznamenávají APPLY CHANGES následující metriky:

  • num_upserted_rows: Počet výstupních řádků přenesených do datové sady během aktualizace.
  • num_deleted_rows: Počet existujících výstupních řádků odstraněných z datové sady během aktualizace.

Metrika num_output_rows , výstup pro toky bez CDC, se pro dotazy nezachytává apply changes .

Jaké datové objekty se používají ke zpracování CDC delta živých tabulek?

Poznámka: Následující datové struktury se vztahují pouze na APPLY CHANGES zpracování, nikoli APPLY CHANGES FROM SNAPSHOT zpracování.

Když deklarujete cílovou tabulku v metastoru Hive, vytvoří se dvě datové struktury:

  • Zobrazení s názvem přiřazeným k cílové tabulce.
  • Interní zálohovací tabulka používaná delta živými tabulkami ke správě zpracování CDC. Tato tabulka je pojmenovaná tak, že se předsadí __apply_changes_storage_ na název cílové tabulky.

Pokud například deklarujete cílovou tabulku s názvem dlt_cdc_target, zobrazí se v metastoru zobrazení s názvem dlt_cdc_target a tabulka s názvem __apply_changes_storage_dlt_cdc_target . Vytvoření zobrazení umožňuje rozdílovým živým tabulkám vyfiltrovat dodatečné informace (například náhrobky a verze) potřebné ke zpracování dat mimo pořadí. Pokud chcete zobrazit zpracovávaná data, zadejte dotaz na cílové zobrazení. Vzhledem k tomu, že schéma __apply_changes_storage_ tabulky se může změnit tak, aby podporovalo budoucí funkce nebo vylepšení, neměli byste se dotazovat na tabulku pro použití v produkčním prostředí. Pokud do tabulky přidáte data ručně, předpokládá se, že záznamy přicházejí před dalšími změnami, protože sloupce verze chybí.

Pokud kanál publikuje do katalogu Unity, jsou interní záložní tabulky pro uživatele nepřístupné.