ステートフルな構造化ストリーミング クエリを最適化する

ステートフルな構造化ストリーミング クエリの中間状態の情報を管理することにより、予期せぬ待機時間や運用環境の問題を防ぐことができます。

Databricks では次を推奨しています。

  • コンピューティングに最適化されたインスタンスをワーカーとして使用します。
  • シャッフル パーティションの数を、クラスター内のコア数の 1 倍から 2 倍に設定します。
  • SparkSession の spark.sql.streaming.noDataMicroBatches.enabled 構成を false に設定します。 これにより、ストリーミング マイクロバッチ エンジンは、データを含まないマイクロバッチを処理できなくなります。 また、この構成を false に設定すると、即時ではなく新しいデータが到着するまで、ウォーターマークや処理時間のタイムアウトを活用するステートフル操作によってデータ出力が得られない可能性があることに注意してください。

Databricks では、ステートフル ストリームの状態を管理するために、RocksDB と変更ログのチェックポイント処理を使用することをお勧めします。 「Azure Databricks で RocksDB 状態ストアを構成する」をご覧ください。

Note

状態管理スキームは、クエリの再起動間で変更できません。 つまり、既定の管理を使用してクエリを開始した場合、新しいチェックポイントの場所でクエリを最初から開始しない限り、クエリを変更することはできません。

構造化ストリーミングで複数のステートフル演算子を操作する

Databricks Runtime 13.3 LTS 以降では、Azure Databricks により、構造化ストリーミング ワークロードのステートフル演算子に高度なサポートが提供されます。 複数のステートフル演算子を連結できるようになりました。つまり、ウィンドウ集計などの操作の出力を結合などの別のステートフル操作にフィードできます。

次の例では、使用できるいくつかのパターンを示します。

重要

複数のステートフル演算子を操作する場合、次の制限があります。

  • FlatMapGroupWithState はサポートされません。
  • 追加出力モードのみがサポートされています。

チェーンされた時間枠の集計

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

2 つの異なるストリームの時間枠の集計の後にストリーム同士の枠の結合が続く

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

ストリーム同士の時間間隔結合の後に時間枠の集計が続く

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

構造化ストリーミングの状態の再調整

Delta Live Tables のすべてのストリーミング ワークロードに対して、状態の再調整が既定で有効になっています。 Databricks Runtime 11.3 LTS 以降では、Spark クラスター構成で次の構成オプションを設定して、状態の再調整を有効にすることができます。

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状態を再調整すると、クラスターのサイズ変更イベントが発生するステートフルな構造化ストリーミング パイプラインにメリットがあります。 クラスター サイズの変化に関わらず、ステートレス ストリーミング操作には、メリットがありません。

Note

コンピューティングの自動スケールには、構造化ストリーミング ワークロードのクラスター サイズのスケールダウンに制限があります。 Databricks では、ストリーミング ワークロードに、拡張自動スケーリングを備えた Delta Live Tables を使用することをお勧めします。 「拡張自動スケーリングを使用して、Delta Live Tables パイプラインのクラスター使用率を最適化する」を参照してください。

クラスターのサイズ変更イベントにより、状態の再調整がトリガーされます。 再調整イベント中に、状態がクラウド ストレージから新しい Executor に読み込まれるため、マイクロバッチの待機時間が長くなる場合があります。

mapGroupsWithState の初期状態を指定する

flatMapGroupsWithState または mapGroupsWithState を使用して構造化ストリーミング ステートフル処理のユーザー定義初期状態を指定できます。 これにより、有効なチェックポイントなしでステートフル ストリームを開始するときにデータの再処理を回避できます。

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

flatMapGroupsWithState 演算子の初期状態を指定する使用例は、次のとおりです。

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState 演算子の初期状態を指定する使用例は、次のとおりです。

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState の更新関数をテストする

TestGroupState API を使用すると、Dataset.groupByKey(...).mapGroupsWithState(...)Dataset.groupByKey(...).flatMapGroupsWithState(...) に使用される状態更新関数をテストできます。

状態更新関数は、GroupState 型のオブジェクトを使用して、前の状態を入力として受け取ります。 Apache Spark の GroupState リファレンス ドキュメントを参照してください。 例:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}