Использование foreachBatch для записи в произвольные приемники данных

В этой статье описывается использование foreachBatch структурированной потоковой передачи для записи выходных данных потокового запроса в источники данных, которые не имеют существующего приемника потоковой передачи.

Шаблон streamingDF.writeStream.foreachBatch(...) кода позволяет применять пакетные функции к выходным данным каждого микропакета потокового запроса. Функции, используемые с foreachBatch двумя параметрами:

  • Кадр данных с выходными данными микропакета.
  • Уникальный идентификатор микропакета.

Необходимо использовать foreachBatch для операций слияния Delta Lake в структурированной потоковой передаче. См . upsert из потоковых запросов с помощью foreachBatch.

Применение дополнительных операций с кадрами данных

Многие операции с кадрами данных и наборами данных не поддерживаются при потоковой передаче кадров данных, так как в этих случаях Spark не поддерживает создание добавочных планов. С помощью foreachBatch() можно применить некоторые из этих операций к каждому выходному микропакету. Например, можно использовать foreachBath() и операцию SQL MERGE INTO для записи выходных данных агрегатов потоковой передачи в разностную таблицу в режиме обновления. Дополнительные сведения см. в разделе MERGE INTO.

Внимание

  • foreachBatch() предоставляет гарантии записи не менее одного раза. Но можно использовать значение batchId, предоставленное для функции, как способ дедупликации выходных данных и получения гарантии строго однократной записи. В любом случае вам нужно будет самостоятельно принять решение о комплексной семантике.
  • foreachBatch() не работает с режимом непрерывной обработки, так как существенно зависит от выполнения микропакетов запроса потоковой передачи. Если данные записываются в непрерывном режиме, используйте foreach().

Пустой кадр данных можно вызвать с помощью foreachBatch() пользовательского кода, чтобы обеспечить правильную работу. Например:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Изменения foreachBatch поведения в Databricks Runtime 14.0

В Databricks Runtime 14.0 и более поздних версиях для вычислений, настроенных в режиме общего доступа, применяются следующие изменения поведения:

  • print() команды записывают выходные данные в журналы драйверов.
  • Невозможно получить доступ к подмодулу dbutils.widgets внутри функции.
  • Все файлы, модули или объекты, на которые ссылается функция, должны быть сериализуемыми и доступными в Spark.

Повторное использование существующих источников данных пакетной обработки

С помощью foreachBatch()можно использовать существующие средства записи пакетных данных для приемников данных, которые, возможно, не поддерживают структурированную потоковую передачу. Вот несколько таких случаев.

Многие другие источники данных пакетной службы можно использовать из foreachBatch(). См. статью "Подключение к источникам данных".

Запись в несколько расположений

Если необходимо записать выходные данные потокового запроса в несколько расположений, Databricks рекомендует использовать несколько записей структурированной потоковой передачи для оптимальной параллелизации и пропускной способности.

Использование foreachBatch для записи в несколько приемников сериализует выполнение потоковой записи, что может увеличить задержку для каждого микропакета.

Если вы используете foreachBatch для записи в несколько таблиц Delta, см . статью "Идемпотентная таблица" записывается в foreachBatch.