Использование веб-канала изменений данных Delta Lake в Azure Databricks

Веб-канал данных разрешает Azure Databricks отслеживать изменения на уровне строк между версиями таблицы Delta. Когда эта функция включена в таблице Delta, среда выполнения регистрирует события изменений для всех данных, записываемых в таблицу. Сюда входят данные строк вместе с метаданными, указывающими, была ли соответствующая строка вставлена, удалена или обновлена.

Внимание

Веб-канал изменений работает в тандеме с журналом таблиц для предоставления сведений об изменениях. Так как клонирование таблицы Delta создает отдельную историю, веб-канал изменений в клонированных таблицах не соответствует исходной таблице.

Добавочная обработка данных об изменении

Databricks рекомендует использовать канал изменений в сочетании со структурированной потоковой передачей для добавочного процесса изменений из таблиц Delta. Для автоматического отслеживания версий веб-канала измененных данных таблицы необходимо использовать структурированную потоковую передачу для Azure Databricks.

Примечание.

Разностные динамические таблицы предоставляют функциональные возможности для простого распространения измененных данных и хранения результатов в виде SCD (медленно меняющегося измерения) типа 1 или типа 2 таблиц. См . API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц.

Чтобы прочитать веб-канал измененных данных из таблицы, необходимо включить веб-канал изменений в этой таблице. Дополнительные сведения см. в разделе Включение веб-канала изменений.

Задайте параметру readChangeFeed true при настройке потока для таблицы для чтения веб-канала изменений, как показано в следующем примере синтаксиса:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

По умолчанию поток возвращает последний моментальный снимок таблицы при первом запуске потока в качестве INSERT и будущих изменений в виде измененных данных.

Изменение данных фиксируется в рамках транзакции Delta Lake и становится доступным одновременно с новыми данными, фиксируемых в таблице.

При необходимости можно указать начальную версию. См. статью " Следует ли указать начальную версию?".

Канал изменений данных также поддерживает пакетное выполнение, которое требует указания начальной версии. Ознакомьтесь с изменениями в пакетных запросах.

При считывании изменений также поддерживаются такие параметры, как ограничения скорости (maxFilesPerTrigger, maxBytesPerTrigger) и excludeRegex.

Ограничение скорости может быть атомарной операцией для версий, отличных от начальной версии моментального снимка. Таким образом, ограничение скорости применяется ко всей версии фиксации или возвращается вся фиксация.

Следует ли указать начальную версию?

При необходимости можно указать начальную версию, если вы хотите игнорировать изменения, которые произошли до определенной версии. Можно указать версию с помощью метки времени или номера идентификатора версии, записанного в журнале транзакций Delta.

Примечание.

Начальная версия требуется для пакетных операций чтения, и многие шаблоны пакетов могут воспользоваться настройкой необязательной конечной версии.

При настройке рабочих нагрузок структурированной потоковой передачи с участием канала изменений важно понимать, как указать начальную обработку версий.

Многие рабочие нагрузки потоковой передачи, особенно новые конвейеры обработки данных, пользуются поведением по умолчанию. При поведении по умолчанию первый пакет обрабатывается, когда поток сначала записывает все существующие записи в таблице в качестве INSERT операций в канале измененных данных.

Если целевая таблица уже содержит все записи с соответствующими изменениями до определенной точки, укажите начальную версию, чтобы избежать обработки состояния исходной таблицы в виде INSERT событий.

Следующий пример синтаксиса восстанавливается после сбоя потоковой передачи, в котором контрольная точка повреждена. В этом примере предполагается следующее:

  1. Канал изменений данных включен в исходной таблице при создании таблицы.
  2. Целевая нижестоящей таблица обработала все изменения вплоть до версии 75.
  3. Журнал версий исходной таблицы доступен для версий 70 и выше.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

В этом примере также необходимо указать новое расположение контрольной точки.

Внимание

Если указать начальную версию, поток не может начинаться с новой контрольной точки, если начальная версия больше не присутствует в журнале таблиц. Delta Lake автоматически очищает исторические версии, что означает, что все указанные начальные версии в конечном итоге удаляются.

См. раздел " Можно ли использовать веб-канал изменений" для воспроизведения всей истории таблицы?.

Чтение изменений в пакетных запросах

С помощью синтаксиса пакетного запроса можно считывать все изменения, начиная с конкретной версии или считывать изменения в заданном диапазоне версий.

Версия указывается в виде целого числа, а отметка времени — в виде строки в формате yyyy-MM-dd[ HH:mm:ss[.SSS]].

Начальные и конечные версии включены в запросы. Чтобы прочитать изменения из конкретной начальной версии до последней версии таблицы, укажите только начальную версию.

Если указанная версия или метка времени старше версии, которая содержит записанные события изменения (т. е. канал данных включен), возникает ошибка, указывающая, что канал изменений не был включен.

В следующих примерах синтаксиса показано использование параметров начальной и конечной версии с пакетными считываниями:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Примечание.

По умолчанию, если пользователь передает версию, превышающую версию последней фиксации в таблице, или метку времени, которая более новая, чем последняя фиксация, возникает ошибка timestampGreaterThanLatestCommit. В Databricks Runtime 11.3 LTS и более поздних версиях веб-канал изменений может обрабатывать случай вне диапазона, если пользователь задает следующую конфигурацию следующим образом true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Если в таблице указана начальная версия, превышающая версию последней фиксации, или метка времени, которая более новая, чем последняя фиксация в таблице, то при включении предыдущей конфигурации возвращается пустой результат чтения.

Если вы предоставите конечную версию, превышающую последнюю фиксацию в таблице или конечную метку времени, которая позже, чем последняя фиксация в таблице, то при включении предыдущей конфигурации в режиме пакетного чтения возвращаются все изменения между начальной версией и последней фиксацией.

Какова схема канала измененных данных?

При чтении из канала измененных данных для таблицы используется схема последней версии таблицы.

Примечание.

Большинство операций изменения схемы и эволюции полностью поддерживаются. Таблица с включенным сопоставлением столбцов не поддерживает все варианты использования и демонстрирует другое поведение. Сведения об ограничениях канала изменений для таблиц с включенным сопоставлением столбцов.

Помимо столбцов данных из схемы таблицы Delta, канал изменений содержит столбцы метаданных, определяющие тип события изменения:

Имя столбца Тип Значения
_change_type Строка insert, , update_postimage, delete update_preimage (1)
_commit_version Long Разностный журнал или версия таблицы, содержащая изменение.
_commit_timestamp Метка времени Метка времени, связанная с моментом создания фиксации.

(1) preimage — это значение перед обновлением, postimage это значение после обновления.

Примечание.

Невозможно включить веб-канал изменений в таблице, если схема содержит столбцы с теми же именами, что и эти добавленные столбцы. Переименуйте столбцы в таблице, чтобы устранить этот конфликт, прежде чем пытаться включить канал измененных данных.

Включение веб-канала изменений данных

Вы можете читать только веб-канал измененных данных для включенных таблиц. Функцию веб-канала изменений необходимо включить явным образом одним из перечисленных ниже способов.

  • Новая таблица: задайте свойство таблицы delta.enableChangeDataFeed = true в команде CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Существующая таблица: задайте свойство таблицы delta.enableChangeDataFeed = true в команде ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Все новые таблицы:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Внимание

Записываются только изменения, внесенные после включения веб-канала изменений. Прошлые изменения таблицы не записываются.

Хранение данных об изменениях

Включение канала данных изменений приводит к небольшому увеличению затрат на хранение таблицы. Записи измененных данных создаются при выполнении запроса и, как правило, гораздо меньше общего размера перезаписанных файлов.

Azure Databricks фиксирует данные об изменениях для операций UPDATE, DELETE и MERGE в папке _change_data в каталоге таблицы. Некоторые операции, такие как операции только вставки и удаление полного раздела, не создают данные в каталоге _change_data , так как Azure Databricks может эффективно вычислить веб-канал изменений непосредственно из журнала транзакций.

Все операции чтения для файлов данных в папке _change_data должны проходить через поддерживаемые API Delta Lake.

На файлы в папке _change_data распространяется действие политики хранения таблицы. Изменение данных канала данных удаляется при выполнении VACUUM команды.

Можно ли использовать веб-канал изменений для воспроизведения всей истории таблицы?

Веб-канал изменений не предназначен для постоянной записи всех изменений в таблице. Веб-канал изменений записывает только изменения, которые происходят после включения.

Канал изменений данных и Delta Lake позволяют всегда восстанавливать полный снимок исходной таблицы, что означает, что вы можете начать новое потоковое чтение для таблицы с включенным каналом изменений и записать текущую версию этой таблицы и все изменения, которые происходят после этого.

Записи в веб-канале измененных данных необходимо рассматривать как временные и доступные только для указанного периода хранения. Журнал транзакций Delta удаляет версии таблиц и соответствующие версии веб-канала изменений через регулярные интервалы. Если версия удаляется из журнала транзакций, вы больше не сможете прочитать веб-канал изменений для этой версии.

Если вашему варианту использования требуется сохранить постоянную историю всех изменений в таблице, следует использовать добавочную логику для записи записей из канала изменений в новую таблицу. В следующем примере кода показано использование trigger.AvailableNow, которое использует добавочную обработку структурированной потоковой передачи, но обрабатывает доступные данные как пакетную рабочую нагрузку. Эту рабочую нагрузку можно запланировать асинхронно с помощью основных конвейеров обработки, чтобы создать резервную копию веб-канала изменений для аудита или полной воспроизведения.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Изменение ограничений канала данных для таблиц с включенным сопоставлением столбцов

С включенным сопоставлением столбцов в таблице Delta можно удалить или переименовать столбцы в таблице без перезаписи файлов данных для существующих данных. При включенном сопоставлении столбцов канал измененных данных имеет ограничения после выполнения изменений схемы, отличных от аддитивной схемы, таких как переименование или удаление столбца, изменение типа данных или изменение допустимости null.

Внимание

  • Вы не можете считывать канал данных изменений для транзакции или диапазона, в котором изменение схемы, отличное от аддитивной схемы, происходит с помощью семантики пакетной службы.
  • В Databricks Runtime 12.2 LTS и ниже таблицы с включенным сопоставлением столбцов, которые испытали неаддитивные изменения схемы, не поддерживают потоковую передачу операций чтения на веб-канале измененных данных. См. раздел Потоковая передача с сопоставлением столбцов и изменениями схемы.
  • В Databricks Runtime 11.3 LTS и ниже невозможно прочитать веб-канал изменений для таблиц с включенным сопоставлением столбцов, которые имели опыт переименования или удаления столбцов.

В Databricks Runtime 12.2 LTS и более поздних версиях можно выполнять пакетные операции чтения по веб-каналу измененных данных для таблиц с включенным сопоставлением столбцов, которые имели недитивные изменения схемы. Вместо использования схемы последней версии таблицы операции чтения используют схему конечной версии таблицы, указанной в запросе. Запросы по-прежнему завершаются ошибкой, если указанный диапазон версий охватывает изменение схемы, отличной от добавок.