Потоковая передача и добавочное прием

Azure Databricks использует структурированную потоковую передачу Apache Spark для поддержки многочисленных продуктов, связанных с рабочими нагрузками приема, в том числе:

  • Автозагрузчик
  • COPY INTO
  • Конвейеры разностных динамических таблиц
  • Материализованные представления и таблицы потоковой передачи в Databricks SQL

В этой статье рассматриваются некоторые различия между семантикой потоковой и добавочной пакетной обработки и предоставляется общий обзор настройки рабочих нагрузок приема для требуемой семантики в Databricks.

В чем разница между потоковой передачей и добавочным приемом пакетов?

Возможные конфигурации рабочих процессов приема варьируются от обработки в режиме реального времени до редкой добавочной пакетной обработки. Оба шаблона используют структурированную потоковую передачу Apache Spark для добавочной обработки, но имеют другую семантику. Для простоты эта статья относится к приему в режиме реального времени как прием потоковой передачи и более редкой добавочной обработки как добавочная пакетная прием.

Прием потоковой передачи

Потоковая передача в контексте приема данных и обновлений таблиц относится к обработке данных практически в режиме реального времени, когда Azure Databricks получает записи из источника в приемник в микробатах с помощью инфраструктуры always-on. Рабочая нагрузка потоковой передачи постоянно получает обновления из настроенных источников данных, если не возникает сбой, который останавливает прием.

Добавочное прием пакетов

Добавочное прием пакетного приема относится к шаблону, в котором все новые записи обрабатываются из источника данных в кратковременном задании. Добавочное прием пакетного пакета часто происходит в соответствии с расписанием, но его также можно активировать вручную или на основе прибытия файла.

Добавочное прием пакетов отличается от приема пакетной службы в том, что он автоматически обнаруживает новые записи в источнике данных и игнорирует записи, которые уже были приняты.

Прием с заданиями

Задания Databricks позволяют управлять рабочими процессами и планировать задачи, включающие записные книжки, библиотеки, конвейеры Delta Live Tables и запросы SQL Databricks.

Примечание.

Вы можете использовать все вычислительные типы и типы задач Azure Databricks для настройки добавочного приема пакетов. Прием потоковой передачи поддерживается только в рабочей среде для классических заданий вычислений и разностных динамических таблиц.

Задания имеют два основных режима работы:

  • Непрерывные задания автоматически повторяются при возникновении сбоя. Этот режим предназначен для приема потоковой передачи.
  • Запускаемые задания выполняют задачи при активации. Триггеры включают:
    • Триггеры на основе времени, выполняющие задания по указанному расписанию.
    • Триггеры на основе файлов, выполняющие задания при посадке файлов в указанное расположение.
    • Другие триггеры, такие как вызовы REST API, выполнение команд интерфейса командной строки Azure Databricks или нажатие кнопки "Запустить сейчас " в пользовательском интерфейсе рабочей области.

Для добавочных пакетных рабочих нагрузок настройте задания с помощью AvailableNow режима триггера следующим образом:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

Для рабочих нагрузок потоковой передачи используется processingTime ="500ms"интервал триггера по умолчанию. В следующем примере показано, как обрабатывать микропакет каждые 5 секунд:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

Внимание

Бессерверные задания не поддерживают интервалы триггеров scala, непрерывного режима или интервалы триггеров на основе времени для структурированной потоковой передачи. Используйте классические задания, если вам нужна семантика приема в режиме реального времени.

Прием с помощью разностных динамических таблиц

Как и задания, конвейеры Delta Live Tables могут выполняться в активированном или непрерывном режиме. Для семантики потоковой передачи практически в режиме реального времени с таблицами потоковой передачи используйте непрерывный режим.

Используйте таблицы потоковой передачи для настройки потоковой передачи или добавочного пакетного приема из облачного хранилища объектов, Apache Kafka, Amazon Kinesis, Google Pub/Sub или Apache Pulsar.

LakeFlow Connect использует разностные динамические таблицы для настройки конвейеров приема из подключенных систем. См. статью LakeFlow Connect.

Материализованные представления гарантируют семантику операций, эквивалентную пакетным рабочим нагрузкам, но могут оптимизировать множество операций, чтобы вычислить результаты постепенно. Сведения об операциях обновления см. в разделе "Операции обновления" для материализованных представлений.

Прием с помощью Databricks SQL

Таблицы потоковой передачи можно использовать для настройки добавочного приема пакетной обработки из облачного хранилища объектов, Apache Kafka, Amazon Kinesis, Google Pub/Sub или Apache Pulsar.

Материализованные представления можно использовать для настройки добавочной пакетной обработки из источников, которые полностью воспроизводимы для указанного набора операций. Сведения об операциях обновления см. в разделе "Операции обновления" для материализованных представлений.

COPY INTO предоставляет знакомый синтаксис SQL для добавочной пакетной обработки файлов данных в облачном хранилище объектов. COPY INTO поведение аналогично шаблонам, поддерживаемым таблицами потоковой передачи для облачного хранилища объектов, но не все параметры по умолчанию эквивалентны для всех поддерживаемых форматов файлов.