ステートフル クエリの非同期状態チェックポイント処理

Note

Databricks Runtime 10.4 LTS 以降で使用できます。

非同期状態チェックポイントは、ストリーミング クエリに対して 1 回だけ保証されますが、状態更新時にボトルネックになっている一部の構造化ストリーミング ステートフル ワークロードの全体的な待機時間を短縮できます。 これは、状態のチェックポイント処理が完了するのを待たずに、前のマイクロバッチの計算が完了するとすぐに、次のマイクロバッチの処理を開始することによって実現されます。 次の表は、同期チェックポイントと非同期チェックポイントのトレードオフを比較しています:

特徴 同期チェックポイント 非同期チェックポイント
Latency マイクロバッチごとの待機時間が長くなります。 マイクロバッチが重複する可能性があり、待機時間が短縮されます。
Restart 再実行する必要があるのは最後のバッチのみのため、復旧が高速です。 1 つ以上のマイクロバッチの再実行が必要になる場合があるため、再起動の遅延が長くなることがあります。

非同期状態チェックポイント処理の恩恵を受ける可能性があるストリーミング ジョブの特性を次に示します:

  • ジョブに 1 つまたは複数のステートフル操作 (例: 集計、flatMapGroupsWithStatemapGroupsWithState、ストリーム同士の結合) があります。
  • 状態チェックポイントの待機時間は、全体的なバッチ実行の待機時間に大きく貢献する要素の 1 つです。 この情報は、StreamingQueryProgress イベントで見つかります。 これらのイベントは、Spark ドライバーの log4j ログにもあります。 ストリーミング クエリの進行状況の例と、バッチ実行全体の待機時間に対する状態チェックポイントの影響を調べる方法を次に示します。
    • {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
      }
      
    • 上記のクエリの進行状況イベントの状態チェックポイントの待機時間の分析

      • バッチ実行時間 (durationMs.triggerDuration) は約 547 秒です。
      • 状態ストアのコミット待機時間 (stateOperations[0].commitTimeMs) は約 3,186 秒です。 コミット待機時間は、状態ストアを含むタスク全体で集計されます。 この場合、このようなタスク (stateOperators[0].numShufflePartitions) は 64 件です。
      • 状態演算子を含む各タスクは、チェックポイントに平均 50 秒 (3,186/64) かかっています。 これは、バッチ期間に影響する追加の待機時間です。 64 のタスクすべてが同時に実行されている場合、チェックポイント ステップはバッチ期間の約 9% (50 秒/547 秒) に貢献しました。 最大同時実行タスク数が 64 未満の場合、割合はさらに高くなります。

非同期状態チェックポイント処理の有効化

非同期状態のチェックポイント処理には、RocksDB ベースの状態ストアを使用する必要があります。 次の構成を設定します:


spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

非同期チェックポイント処理の制限事項と要件

注意

コンピューティングの自動スケールには、構造化ストリーミング ワークロードのクラスター サイズのスケールダウンに制限があります。 Databricks では、ストリーミング ワークロードに、拡張自動スケーリングを備えた Delta Live Tables を使用することをお勧めします。 拡張自動スケーリングを使用して Delta Live Tables パイプラインのクラスター使用率を最適化するを参照してください。

  • 1 つ以上のストアで非同期チェックポイントでエラーが発生すると、クエリは失敗します。 同期チェックポイント モードでは、チェックポイントはタスクの一部として実行され、Spark はクエリを失敗する前にタスクを複数回再試行します。 このメカニズムは、非同期状態チェックポイント処理には存在しない。 Databricks では、ジョブの失敗時に自動再試行に連続ジョブを使用することをお勧めします。 「ジョブを継続的に実行する」を参照してください。
  • 非同期状態チェックポイント処理は、状態ストアの場所がマイクロバッチ実行間で変更されないときに最も高いパフォーマンスを発揮します。 クラスターのサイズ変更を、非同期状態チェックポイントと組み合わせて使用すると、クラスターのサイズ変更イベントの一環としてノードが追加または削除されるときに、状態ストア インスタンスが再分散される可能性があるため、正常に機能しない場合があります。
  • 非同期状態チェックポイント処理は、RocksDB 状態ストア プロバイダーの実装でのみサポートされます。 既定のメモリ内状態ストアの実装では、これはサポートされません。