Azure Databricks で構造化ストリーミング クエリを監視する

Azure Databricks では、[ストリーミング] タブの Spark UI で、構造化ストリーミング アプリケーションの監視が組み込みで提供されています。

Spark UI で構造化ストリーミング クエリを区別する

writeStream コードに .queryName(<query-name>) を追加し、ストリームに一意のクエリ名を指定すると、Spark UI でどのメトリックがどのストリームのものであるかを簡単に区別できます。

構造化ストリーミング メトリックを外部サービスにプッシュする

Apache Spark のストリーミング クエリ リスナー インターフェイスを使用して、アラートまたはダッシュボードのユース ケース用に外部サービスにストリーミング メトリックをプッシュできます。 Databricks Runtime 11.3 LTS 以上では、Python と Scala でストリーミング クエリ リスナーを使用できます。

重要

Unity Catalog によって管理される資格情報とオブジェクトは、StreamingQueryListener ロジックでは使用できません。

Note

リスナーの処理遅延は、クエリの処理速度に大きな影響を与える可能性があります。 効率を上げるため、これらのリスナーでの処理ロジックを制限し、Kafka などの高速応答システムへの書き込みを選択することをお勧めします。

リスナーを実装するための構文の基本的な例を次のコードに示します。

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

構造化ストリーミングでの監視可能なメトリックを定義する

監視可能なメトリックは、クエリ (DataFrame) で定義できる任意の名前付き集計関数です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリックを含む名前付きイベントが生成されます。

Spark セッションにリスナーをアタッチすると、これらのメトリックを監視できます。 リスナーは実行モードによって異なります。

  • バッチ モード: QueryExecutionListener を使用します。

    QueryExecutionListener は、クエリの完了時に呼び出されます。 QueryExecution.observedMetrics マップを使用してメトリックにアクセスします。

  • ストリーミング、またはマイクロバッチ: StreamingQueryListenerを使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetrics マップを使用してメトリックにアクセスします。 Azure Databricks では、継続的な実行ストリーミングはサポートされていません。

次に例を示します。

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener オブジェクトのメトリック

メトリック 説明
id 再起動が行われても保持される一意のクエリ ID。 StreamingQuery.id() に関する説明をご覧ください。
runId 起動/再起動ごとに一意のクエリ ID。 StreamingQuery.runId() に関する説明をご覧ください。
name ユーザーが指定したクエリの名前。 名前が指定されていない場合、名前は null です。
timestamp マイクロバッチの実行のタイムスタンプ。
batchId 処理されているデータの現在のバッチの一意の ID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行されることがあります。 同様に、処理するデータがない場合、バッチ ID はインクリメントされません。
numInputRows トリガーで処理されたレコード数の集計 (すべてのソースが対象)。
inputRowsPerSecond データの到着速度の集計 (すべてのソースが対象)。
processedRowsPerSecond Spark によるデータの処理速度の集計 (すべてのソースが対象)。

durationMs オブジェクト

マイクロバッチ実行プロセスのさまざまなステージが完了するまでにかかる時間に関する情報。

メトリック 説明
durationMs.addBatch マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画に要した時間は含まれません。
durationMs.getBatch オフセットに関するメタデータをソースから取得するのにかかる時間。
durationMs.latestOffset マイクロバッチに使われた最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を示します。
durationMs.queryPlanning 実行プランの生成にかかった時間。
durationMs.triggerExecution マイクロバッチの計画と実行にかかる時間。
durationMs.walCommit 使用可能な新しいオフセットをコミットするのにかかった時間。

eventTime オブジェクト

マイクロバッチで処理されているデータ内で示されたイベント時間の値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されているステートフル集計を処理するための状態のトリミング方法を把握するため、ウォーターマークによって使用されます。

メトリック 説明
eventTime.avg トリガーで示された平均イベント時間。
eventTime.max トリガーで示された最大イベント時間。
eventTime.min トリガーで示された最小イベント時間。
eventTime.watermark トリガーで使われたウォーターマークの値。

stateOperators オブジェクト

構造化ストリーミング ジョブで定義されているステートフル操作と、そこから生成された集計に関する情報。

メトリック 説明
stateOperators.operatorName symmetricHashJoindedupestateStoreSave など、メトリックと関係のあるステートフル演算子の名前。
stateOperators.numRowsTotal ステートフル演算子または集計の結果としての状態にある行の合計数。
stateOperators.numRowsUpdated ステートフル演算子または集計の結果としての状態で更新された行の合計数。
stateOperators.allUpdatesTimeMs このメトリックは現在 Spark では測定できず、今後のアップデートで削除される予定です。
stateOperators.numRowsRemoved ステートフル演算子または集計の結果として状態から削除された行の合計数。
stateOperators.allRemovalsTimeMs このメトリックは現在 Spark では測定できず、今後のアップデートで削除される予定です。
stateOperators.commitTimeMs すべての更新 (書き込みと削除) をコミットし、新しいバージョンを返すのにかかった時間。
stateOperators.memoryUsedBytes 状態ストアによって使われたメモリ。
stateOperators.numRowsDroppedByWatermark ステートフル集計に含めるには遅すぎると見なされた行の数。 ストリーミング集計のみ: 集計後に削除された行の数 (生の入力行ではありません)。 この数は正確ではありませんが、削除されている遅延データがあることを示しています。
stateOperators.numShufflePartitions このステートフル演算子のシャッフル パーティションの数。
stateOperators.numStateStoreInstances 演算子が初期化および保守した実際の状態ストア インスタンス。 多くのステートフル演算子の場合、これはパーティションの数と同じです。 ただし、ストリーム同士の結合では、パーティションごとに 4 つの状態ストア インスタンスを初期化します。

stateOperators.customMetrics オブジェクト

RocksDB から収集された、構造化ストリーミング ジョブに対して保持するステートフル値に関するパフォーマンスと操作についてのメトリックをキャプチャする情報。 詳細については、「Azure Databricks で RocksDB 状態ストアを構成する」を参照してください。

メトリック 説明
customMetrics.rocksdbBytesCopied RocksDB ファイル マネージャーによって追跡された、コピーされたバイト数。
customMetrics.rocksdbCommitCheckpointLatency ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。
customMetrics.rocksdbCompactLatency チェックポイント コミット中の圧縮時間 (ミリ秒単位、オプション)。
customMetrics.rocksdbCommitFileSyncLatencyMs ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。
customMetrics.rocksdbCommitFlushLatency RocksDB のメモリ内の変更をローカル ディスクにフラッシュする時間 (ミリ秒単位)。
customMetrics.rocksdbCommitPauseLatency 圧縮などのチェックポイントのコミットの一部としてバックグラウンド ワーカー スレッドを停止する時間(ミリ秒単位)。
customMetrics.rocksdbCommitWriteBatchLatency メモリ内構造 (WriteBatch) でのステージされた書き込みをネイティブ RocksDB に適用する時間 (ミリ秒単位)。
customMetrics.rocksdbFilesCopied RocksDB ファイル マネージャーによって追跡された、コピーされたファイルの数。
customMetrics.rocksdbFilesReused RocksDB ファイル マネージャーによって追跡された、再利用されたファイルの数。
customMetrics.rocksdbGetCount DB に対する get 呼び出しの数 (ステージング書き込みに使われるメモリ内バッチである WriteBatch からの gets は含まれません)。
customMetrics.rocksdbGetLatency 基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。
customMetrics.rocksdbReadBlockCacheHitCount ローカル ディスクの読み取りを回避するのに役立つ、RocksDB のブロック キャッシュからのキャッシュ ヒット数。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB のブロック キャッシュ数は、ローカル ディスクの読み取りを回避するのに役立ちません。
customMetrics.rocksdbSstFileSize すべての Static Sorted Table (SST) ファイルのサイズ。RocksDB がデータの格納に使用する表形式の構造です。
customMetrics.rocksdbTotalBytesRead get 操作によって読み取られた非圧縮バイト数。
customMetrics.rocksdbTotalBytesReadByCompaction 圧縮プロセスがディスクから読み取ったバイト数。
customMetrics.rocksdbTotalBytesReadThroughIterator 反復子を使用して読み取られた非圧縮データの合計バイト数。 一部のステートフル操作 (FlatMapGroupsWithState でのタイムアウト処理や、ウォーターマーク処理など) では、反復子を使って DB 内のデータを読み取る必要があります。
customMetrics.rocksdbTotalBytesWritten put 操作によって書き込まれた非圧縮バイトの合計数。
customMetrics.rocksdbTotalBytesWrittenByCompaction 圧縮プロセスがディスクに書き込む合計バイト数。
customMetrics.rocksdbTotalCompactionLatencyMs バックグラウンド圧縮や、コミット中に開始されたオプションの圧縮など、RocksDB での圧縮時間 (ミリ秒単位)。
customMetrics.rocksdbTotalFlushLatencyMs バックグラウンド フラッシュを含む合計フラッシュ時間。 フラッシュ操作は、いっぱいになった MemTable がストレージにフラッシュされるプロセスです。 MemTables は、データが RocksDB に格納される最初のレベルです。
customMetrics.rocksdbZipFileBytesUncompressed ファイル マネージャーによって報告された、圧縮されていない zip ファイルのサイズ (バイト単位)。 ファイル マネージャーは、物理 SST ファイルのディスク領域の使用と削除を管理します。

sources オブジェクト (Kafka)

メトリック 説明
sources.description 読み取り元の正確な Kafka トピックを指定する、Kafka ソースの詳細な説明。 (例: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”)。
sources.startOffset オブジェクト ストリーミング ジョブが開始した、Kafka トピック内の開始オフセット番号。
sources.endOffset オブジェクト マイクロバッチによって処理された最後のオフセット。 進行中のマイクロバッチ実行では、これは latestOffset と等しい場合があります。
sources.latestOffset オブジェクト マイクロバッチによって把握された最新のオフセット。 スロットリングがある場合、マイクロバッチ処理ですべてのオフセットが処理されない可能性があります。その場合、endOffsetlatestOffset に差が生じます。
sources.numInputRows このソースから処理された入力行数。
sources.inputRowsPerSecond このソースからの処理のためにデータが到着する速度。
sources.processedRowsPerSecond Spark がこのソースからのデータを処理する速度。

sources.metrics オブジェクト (Kafka)

メトリック 説明
sources.metrics.avgOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの平均数。
sources.metrics.estimatedTotalBytesBehindLatest クエリ プロセスがサブスクライブされたトピックから消費していない推定バイト数。
sources.metrics.maxOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最大数。
sources.metrics.minOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最小数。

sink オブジェクト (Kafka)

メトリック 説明
sink.description ストリーミング クエリが書き込む Kafka シンクの説明。使用されている特定の Kafka シンク実装の詳細を示します。 (例: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”)。
sink.numOutputRows マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値が "-1" になることがあり、一般に、"不明" と解釈できます。

sources オブジェクト (Delta Lake)

メトリック 説明
sources.description ストリーミング クエリの読み取り元となるソースの説明。 (例: “DeltaSource[table]”)。
sources.[startOffset/endOffset].sourceVersion このオフセットがエンコードされるシリアル化のバージョン。
sources.[startOffset/endOffset].reservoirId 読み取られるテーブルの ID。 これは、クエリを再開するときに誤った構成を検出するために使われます。
sources.[startOffset/endOffset].reservoirVersion 現在処理中のテーブルのバージョン。
sources.[startOffset/endOffset].index このバージョンの AddFiles のシーケンス内のインデックス。 これは、大きなコミットを複数のバッチに分割するために使われます。 このインデックスは、modificationTimestamppath で並べ替えることによって作成されます。
sources.[startOffset/endOffset].isStartingVersion 現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを示します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータがまず処理され、その後、到着する新しいデータが処理されます。
sources.latestOffset マイクロバッチ クエリによって処理された最新のオフセット。
sources.numInputRows このソースから処理された入力行数。
sources.inputRowsPerSecond このソースからの処理のためにデータが到着する速度。
sources.processedRowsPerSecond Spark がこのソースからのデータを処理する速度。
sources.metrics.numBytesOutstanding 未処理ファイル (RocksDB によって追跡されているファイル) の合計サイズ。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。
sources.metrics.numFilesOutstanding 処理される予定の未処理ファイルの数。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。

sink オブジェクト (Delta Lake)

メトリック 説明
sink.description Delta シンクの説明。使用されている特定の Delta シンク実装の詳細を示します。 (例: “DeltaSink[table]”)。
sink.numOutputRows Spark では DSv1 シンク (Delta Lake シンクの分類) の出力行を推定できないため、行数は常に “-1” です。

Kafka 同士の間の StreamingQueryListener イベントの例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake 同士の間の StreamingQueryListener イベントの例

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Kinesis から Delta Lake への StreamingQueryListener イベントの例

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Kafka+Delta Lake から Delta Lake への StreamingQueryListener イベントの例

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake の StreamingQueryListener イベントに対するレート ソースの例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}