Azure Databricks で RocksDB 状態ストアを構成する

ストリーミング クエリを開始する前に、SparkSession で次の構成を設定することにより、RocksDB ベースの状態管理を有効にすることができます。

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Delta Live Tables パイプラインで RocksDB を有効にすることができます。 ステートフル処理については、 パイプライン構成の最適化に関するを参照してください。

変更ログのチェックポイント処理を有効にする

Databricks Runtime 13.3 LTS 以降では、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮するために、変更ログのチェックポイント処理を有効にすることができます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログのチェックポイント処理を有効にすることをお勧めします。

従来、RocksDB 状態ストアはチェックポイント処理中にスナップショットを作成し、データ ファイルをアップロードします。 このコストを回避するために、変更ログのチェックポイント処理では、最後のチェックポイント以降に変更されたレコードのみが永続ストレージに書き込まれます。

変更ログのチェックポイント処理は、既定で無効になっています。 SparkSession レベルで変更ログのチェックポイント処理を有効にするには、次の構文を使用します。

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

既存のストリームで変更ログのチェックポイント処理を有効にし、チェックポイントに格納されている状態情報を維持することができます。

重要

変更ログのチェックポイント処理を有効にしたクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイント処理を無効にして従来のチェックポイント処理の動作に戻すことができますが、これらのクエリは引き続き Databricks Runtime 13.3 LTS 以降で実行する必要があります。 これらの変更を行うには、ジョブを再起動する必要があります。

RocksDB 状態ストア メトリクス

各状態操作は、状態ストアを観察するために RocksDB インスタンスで実行された状態管理操作に関連するメトリックを収集します。これは、デバッグ ジョブの低速化につながる可能性があります。 これらのメトリックは、状態操作が実行されているすべてのタスクについて、ジョブの状態操作ごとに集計 (sum) されます。 これらのメトリックは、StreamingQueryProgressstateOperators フィールド内の customMetrics マップの一部です。 JSON 形式の StreamingQueryProgress の例を次に示します (StreamingQueryProgress.json() を使用して取得)。

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

メトリックの詳細な説明は次のとおりです。

メトリックの名前 説明
rocksdbCommitWriteBatchLatency インメモリ構造 (WriteBatch) でのステージングされた書き込みをネイティブ RocksDB に適用するためにかかった時間 (ミリ秒単位)。
rocksdbCommitFlushLatency ローカル ディスクへの RocksDB のメモリ内の変更をフラッシュするのにかかった時間 (ミリ秒単位)。
rocksdbCommitCompactLatency チェックポイントのコミット中に圧縮に要した時間 (省略可能)(ミリ秒単位)。
rocksdbCommitPauseLatency チェックポイントのコミットの一部としてバックグラウンド ワーカー スレッド (圧縮など) の停止に要した時間 (ミリ秒単位)。
rocksdbCommitCheckpointLatency ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込むのにかかった時間 (ミリ秒単位)。
rocksdbCommitFileSyncLatencyMs ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期するためにかかった時間 (ミリ秒単位)。
rocksdbGetLatency 基になるネイティブ RocksDB::Get 呼び出しごとにかかった平均時間 (ナノ秒単位)。
rocksdbPutCount 基になるネイティブ RocksDB::Put 呼び出しごとにかかった平均時間 (ナノ秒単位)。
rocksdbGetCount ネイティブ RocksDB::Get 呼び出しの数 (ステージング書き込みに使用されるメモリ内バッチ WriteBatch からの Gets は含まれません)。
rocksdbPutCount ネイティブ RocksDB::Put 呼び出しの数 (ステージング書き込みに使用されるメモリ内バッチ WriteBatch への Puts は含まれません)。
rocksdbTotalBytesReadByGet ネイティブ RocksDB::Get 呼び出しによって読み取られた非圧縮バイト数。
rocksdbTotalBytesWrittenByPut ネイティブ RocksDB::Put 呼び出しによって書き込まれた非圧縮バイト数。
rocksdbReadBlockCacheHitCount ローカル ディスクからのデータの読み取りを避けるために、native RocksDB block キャッシュを使用する回数。
rocksdbReadBlockCacheMissCount Native RocksDB block キャッシュが失われ、ローカル ディスクからデータを読み取る必要がある回数。
rocksdbTotalBytesReadByCompaction Native RocksDB の圧縮プロセスによってローカル ディスクから読み取られたバイト数。
rocksdbTotalBytesWrittenByCompaction Native RocksDB 圧縮プロセスによってローカル ディスクに書き込まれたバイト数。
rocksdbTotalCompactionLatencyMs RocksDB 圧縮 (コミット中に開始されるバックグラウンドとオプションの圧縮の両方) にかかった時間 (ミリ秒単位)。
rocksdbWriterStallLatencyMs バックグラウンドでの圧縮、または memtables からディスクへのフラッシュによってライターが停止し時間 (ミリ秒単位)。
rocksdbTotalBytesReadThroughIterator ステートフルな操作 (flatMapGroupsWithState のタイムアウト処理やウィンドウ内の集計でのウォーターマークなど) の一部では、反復子を使用して DB 内のデータ全体を読み取る必要があります。 反復子を使用して読み取られた非圧縮データの合計サイズ。