Używanie zestawienia danych zmian usługi Delta Lake w usłudze Azure Databricks

Zestawienie zmian danych umożliwia usłudze Azure Databricks śledzenie zmian na poziomie wiersza między wersjami tabeli delty. Po włączeniu w tabeli delty środowisko uruchomieniowe rejestruje zdarzenia zmiany dla wszystkich danych zapisanych w tabeli. Obejmuje to dane wierszy wraz z metadanymi wskazującymi, czy określony wiersz został wstawiony, usunięty lub zaktualizowany.

Ważne

Zestawienie zmian danych działa razem z historią tabel w celu udostępnienia informacji o zmianach. Ponieważ klonowanie tabeli delty tworzy oddzielną historię, zestawienie danych zmian w sklonowanych tabelach nie jest zgodne z oryginalną tabelą.

Przyrostowe przetwarzanie zmian danych

Usługa Databricks zaleca używanie zestawienia zmian danych w połączeniu ze strukturą przesyłania strumieniowego w celu przyrostowego przetwarzania zmian z tabel różnicowych. Musisz użyć przesyłania strumieniowego ze strukturą dla usługi Azure Databricks, aby automatycznie śledzić wersje zestawienia zmian w tabeli.

Uwaga

Tabele na żywo funkcji delta zapewniają łatwą propagację danych zmian i przechowywanie wyników jako scD (powoli zmieniającego się wymiar) typu 1 lub 2 tabele. Zobacz Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo.

Aby odczytać zestawienie danych zmian z tabeli, musisz włączyć zestawienie danych zmian w tej tabeli. Zobacz Włączanie zestawienia danych zmian.

Ustaw opcję readChangeFeed na true wartość podczas konfigurowania strumienia względem tabeli, aby odczytać zestawienie danych zmian, jak pokazano w poniższym przykładzie składni:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Domyślnie strumień zwraca najnowszą migawkę tabeli po pierwszym uruchomieniu strumienia w postaci INSERT i przyszłych zmian w miarę zmieniania danych.

Zmiana zatwierdzeń danych w ramach transakcji usługi Delta Lake i staje się dostępna w tym samym czasie nowych zatwierdzeń danych w tabeli.

Opcjonalnie możesz określić wersję początkową. Zobacz Czy należy określić wersję początkową?.

Źródło danych zmian obsługuje również wykonywanie wsadowe, które wymaga określenia wersji początkowej. Zobacz Odczyt zmian w zapytaniach wsadowych.

Opcje, takie jak limity szybkości (maxFilesPerTrigger, maxBytesPerTrigger) i excludeRegex są również obsługiwane podczas odczytywania danych zmiany.

Ograniczanie szybkości może być niepodzielne dla wersji innych niż początkowa wersja migawki. Oznacza to, że cała wersja zatwierdzenia będzie ograniczona lub zostanie zwrócone całe zatwierdzenie.

Czy należy określić wersję początkową?

Opcjonalnie możesz określić wersję początkową, jeśli chcesz zignorować zmiany, które wystąpiły przed określoną wersją. Wersję można określić przy użyciu znacznika czasu lub numeru identyfikatora wersji zarejestrowanego w dzienniku transakcji delty.

Uwaga

Do odczytu wsadowego wymagana jest wersja początkowa, a wiele wzorców wsadowych może skorzystać z ustawienia opcjonalnej wersji końcowej.

Podczas konfigurowania obciążeń przesyłania strumieniowego ze strukturą dotyczących zestawienia zmian ważne jest, aby zrozumieć, jak określenie wersji początkowej ma wpływ na przetwarzanie.

Wiele obciążeń przesyłania strumieniowego, zwłaszcza nowych potoków przetwarzania danych, korzysta z domyślnego zachowania. W przypadku zachowania domyślnego pierwsza partia jest przetwarzana, gdy strumień najpierw rejestruje wszystkie istniejące rekordy w tabeli jako INSERT operacje w zestawieniach danych zmian.

Jeśli tabela docelowa zawiera już wszystkie rekordy z odpowiednimi zmianami do określonego punktu, określ wersję początkową, aby uniknąć przetwarzania stanu tabeli źródłowej jako INSERT zdarzeń.

Następująca przykładowa składnia odzyskiwania po niepowodzeniu przesyłania strumieniowego, w którym punkt kontrolny został uszkodzony. W tym przykładzie przyjęto założenie, że spełnione są następujące warunki:

  1. Źródło danych zmian zostało włączone w tabeli źródłowej podczas tworzenia tabeli.
  2. Docelowa tabela podrzędna przetworzyła wszystkie zmiany do wersji 75.
  3. Historia wersji tabeli źródłowej jest dostępna dla wersji 70 lub nowszych.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

W tym przykładzie należy również określić nową lokalizację punktu kontrolnego.

Ważne

Jeśli określisz wersję początkową, strumień nie może rozpocząć się od nowego punktu kontrolnego, jeśli wersja początkowa nie jest już obecna w historii tabeli. Usługa Delta Lake automatycznie czyści historyczne wersje, co oznacza, że wszystkie określone wersje początkowe zostaną ostatecznie usunięte.

Zobacz Czy mogę użyć zestawienia zmian danych, aby odtworzyć całą historię tabeli?.

Odczytywanie zmian w zapytaniach wsadowych

Składnia zapytań wsadowych umożliwia odczytywanie wszystkich zmian rozpoczynających się od określonej wersji lub odczytywanie zmian w określonym zakresie wersji.

Należy określić wersję jako liczbę całkowitą i znaczniki czasu jako ciąg w formacie yyyy-MM-dd[ HH:mm:ss[.SSS]].

Wersje początkowe i końcowe są uwzględniane w zapytaniach. Aby odczytać zmiany z określonej wersji początkowej do najnowszej wersji tabeli, określ tylko wersję początkową.

Jeśli podasz wersję niższą lub sygnaturę czasową starszą niż ta, która zarejestrowała zdarzenia zmiany — oznacza to, że po włączeniu zestawienia danych zmian zostanie zgłoszony błąd wskazujący, że źródło danych zmian nie zostało włączone.

W poniższych przykładach składni pokazano, jak używać opcji uruchamiania i kończenia wersji z odczytami wsadowymi:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Uwaga

Domyślnie jeśli użytkownik przekazuje wersję lub znacznik czasu przekraczający ostatnie zatwierdzenie w tabeli, zostanie zgłoszony błąd timestampGreaterThanLatestCommit . W środowisku Databricks Runtime 11.3 LTS lub nowszym źródło danych zmiany może obsłużyć przypadek wersji poza zakresem, jeśli użytkownik ustawia następującą konfigurację na true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Jeśli podasz wersję początkową większą niż ostatnie zatwierdzenie w tabeli lub znacznik czasu rozpoczęcia nowszy niż ostatnie zatwierdzenie w tabeli, po włączeniu poprzedniej konfiguracji zwracany jest pusty wynik odczytu.

Jeśli podasz wersję końcową większą niż ostatnie zatwierdzenie w tabeli lub znacznik czasu zakończenia nowszy niż ostatnie zatwierdzenie w tabeli, po włączeniu poprzedniej konfiguracji w trybie odczytu wsadowego zostaną zwrócone wszystkie zmiany między wersją początkową a ostatnim zatwierdzeniem.

Jaki jest schemat zestawienia zmian danych?

Podczas odczytywania ze źródła danych zmian dla tabeli jest używany schemat najnowszej wersji tabeli.

Uwaga

Większość operacji zmiany schematu i ewolucji jest w pełni obsługiwana. Tabela z włączonym mapowaniem kolumn nie obsługuje wszystkich przypadków użycia i demonstruje różne zachowanie. Zobacz Zmienianie ograniczeń źródła danych dla tabel z włączonym mapowaniem kolumn.

Oprócz kolumn danych ze schematu tabeli delta zestawienie danych zawiera kolumny metadanych identyfikujące typ zdarzenia zmiany:

Nazwa kolumny Typ Wartości
_change_type String insert, , update_preimage , delete update_postimage(1)
_commit_version Długi Wersja dziennika delty lub tabeli zawierająca zmianę.
_commit_timestamp Sygnatura czasowa Sygnatura czasowa skojarzona podczas tworzenia zatwierdzenia.

(1) preimage jest wartością przed aktualizacją, postimage jest wartością po aktualizacji.

Uwaga

Nie można włączyć zestawienia danych zmian w tabeli, jeśli schemat zawiera kolumny o takich samych nazwach jak te dodane kolumny. Zmień nazwy kolumn w tabeli, aby rozwiązać ten konflikt przed próbą włączenia zestawienia danych zmian.

Włączanie zestawienia danych zmian

Źródło danych zmian można odczytywać tylko dla tabel z obsługą. Należy jawnie włączyć opcję zestawienia danych zmian przy użyciu jednej z następujących metod:

  • Nowa tabela: ustaw właściwość delta.enableChangeDataFeed = true tabeli w poleceniu CREATE TABLE .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Istniejąca tabela: ustaw właściwość delta.enableChangeDataFeed = true tabeli w poleceniu ALTER TABLE .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Wszystkie nowe tabele:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Ważne

Rejestrowane są tylko zmiany wprowadzone po włączeniu zestawienia danych zmian. Wcześniejsze zmiany w tabeli nie są przechwytywane.

Zmienianie magazynu danych

Włączenie zestawienia danych zmian powoduje niewielki wzrost kosztów magazynowania dla tabeli. Rekordy danych zmiany są generowane podczas uruchamiania zapytania i są zazwyczaj znacznie mniejsze niż całkowity rozmiar przepisanych plików.

Usługa Azure Databricks rejestruje dane zmiany danych dla UPDATEoperacji , DELETEi MERGE w _change_data folderze w katalogu tabeli. Niektóre operacje, takie jak operacje wstawiania i usuwanie pełnej partycji, nie generują danych w _change_data katalogu, ponieważ usługa Azure Databricks może efektywnie obliczyć zestawienie danych zmian bezpośrednio z dziennika transakcji.

Wszystkie operacje odczytu względem plików danych w folderze _change_data powinny przechodzić przez obsługiwane interfejsy API usługi Delta Lake.

Pliki w folderze _change_data są zgodne z zasadami przechowywania tabeli. Zmiana danych źródła danych jest usuwana po uruchomieniu VACUUM polecenia.

Czy mogę użyć zestawienia zmian danych, aby odtworzyć całą historię tabeli?

Zestawienie danych zmian nie ma służyć jako trwały rekord wszystkich zmian w tabeli. Zmiana źródła danych rejestruje tylko zmiany, które występują po jej włączeniu.

Zestawienie danych zmian i usługa Delta Lake umożliwiają zawsze odtworzenie pełnej migawki tabeli źródłowej, co oznacza, że możesz rozpocząć nowe przesyłanie strumieniowe odczytane względem tabeli z włączonym zestawieniem zmian i przechwycić bieżącą wersję tej tabeli oraz wszystkie zmiany, które występują po.

Rekordy w kanale danych zmian należy traktować jako przejściowe i dostępne tylko dla określonego okna przechowywania. Dziennik transakcji delty usuwa wersje tabel i odpowiadające im wersje zestawienia danych zmian w regularnych odstępach czasu. Po usunięciu wersji z dziennika transakcji nie można już odczytać zestawienia danych zmian dla tej wersji.

Jeśli przypadek użycia wymaga zachowania trwałej historii wszystkich zmian w tabeli, należy użyć logiki przyrostowej do zapisywania rekordów ze źródła danych zmian do nowej tabeli. W poniższym przykładzie kodu pokazano użycie metody trigger.AvailableNow, która wykorzystuje przyrostowe przetwarzanie przesyłania strumieniowego ze strukturą, ale przetwarza dostępne dane jako obciążenie wsadowe. To obciążenie można zaplanować asynchronicznie przy użyciu głównych potoków przetwarzania, aby utworzyć kopię zapasową zestawienia zmian danych na potrzeby inspekcji lub pełną powtarzalność.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Zmienianie ograniczeń źródła danych dla tabel z włączonym mapowaniem kolumn

Po włączeniu mapowania kolumn w tabeli delty można usuwać lub zmieniać nazwy kolumn w tabeli bez ponownego zapisywania plików danych dla istniejących danych. Po włączeniu mapowania kolumn zmiana źródła danych ma ograniczenia po wprowadzeniu zmian schematu nie addytywnego, takich jak zmiana nazwy lub usunięcie kolumny, zmiana typu danych lub zmiany wartości null.

Ważne

  • Nie można odczytać zestawienia danych zmian dla transakcji lub zakresu, w którym następuje zmiana schematu nie addytywnego przy użyciu semantyki wsadowej.
  • W środowisku Databricks Runtime 12.2 LTS i poniżej tabele z włączonym mapowaniem kolumn, które doświadczyły zmian schematu nie addytywnego, nie obsługują odczytów przesyłanych strumieniowo na zestawieniach zmian danych. Zobacz Przesyłanie strumieniowe za pomocą mapowania kolumn i zmian schematu.
  • W środowisku Databricks Runtime 11.3 LTS i poniżej nie można odczytać zestawienia zmian danych dla tabel z włączonym mapowaniem kolumn, które doświadczyły zmiany nazwy lub upuszczania kolumn.

W środowisku Databricks Runtime 12.2 LTS lub nowszym można wykonywać operacje odczytu wsadowego dla zestawienia danych zmian dla tabel z włączonym mapowaniem kolumn, które doświadczyły zmian schematu nie addytywnego. Zamiast używać schematu najnowszej wersji tabeli, operacje odczytu używają schematu końcowej wersji tabeli określonej w zapytaniu. Zapytania nadal kończą się niepowodzeniem, jeśli określony zakres wersji obejmuje zmianę schematu nie addytywnego.