Övervaka frågor om strukturerad direktuppspelning i Azure Databricks

Azure Databricks tillhandahåller inbyggd övervakning för program för strukturerad direktuppspelning via Spark-användargränssnittet under fliken Direktuppspelning .

Särskilja strukturerade strömningsfrågor i Spark-användargränssnittet

Ge dina strömmar ett unikt frågenamn genom att lägga .queryName(<query-name>) till i koden writeStream för att enkelt skilja vilka mått som tillhör vilken ström i Spark-användargränssnittet.

Skicka mått för strukturerad direktuppspelning till externa tjänster

Mått för direktuppspelning kan skickas till externa tjänster för användningsfall för aviseringar eller instrumentpaneler med hjälp av Apache Sparks gränssnitt för strömningsfrågaslyssnare. I Databricks Runtime 11.3 LTS och senare är lyssnaren för strömningsfråga tillgänglig i Python och Scala.

Viktigt!

Autentiseringsuppgifter och objekt som hanteras av Unity Catalog kan inte användas i StreamingQueryListener logik.

Kommentar

Bearbetning av svarstider med lyssnare kan avsevärt påverka frågebearbetningshastigheten. Vi rekommenderar att du begränsar bearbetningslogik i dessa lyssnare och väljer att skriva till snabbsvarssystem som Kafka för effektivitet.

Följande kod innehåller grundläggande exempel på syntaxen för att implementera en lyssnare:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definiera observerbara mått i strukturerad direktuppspelning

Observerbara mått namnges godtyckliga aggregeringsfunktioner som kan definieras i en fråga (DataFrame). Så snart körningen av en DataFrame når en slutförandepunkt (dvs. slutför en batchfråga eller når en strömmande epok) genereras en namngiven händelse som innehåller måtten för de data som bearbetats sedan den senaste slutförandepunkten.

Du kan observera dessa mått genom att koppla en lyssnare till Spark-sessionen. Lyssnaren är beroende av körningsläget:

  • Batch-läge: Använd QueryExecutionListener.

    QueryExecutionListener anropas när frågan är klar. Få åtkomst till måtten med hjälp av kartan QueryExecution.observedMetrics .

  • Direktuppspelning eller mikrobatch: Använd StreamingQueryListener.

    StreamingQueryListener anropas när strömningsfrågan slutför en epok. Få åtkomst till måtten med hjälp av kartan StreamingQueryProgress.observedMetrics . Azure Databricks stöder inte kontinuerlig körningsströmning.

Till exempel:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Mått för StreamingQueryListener-objekt

Mätvärde Beskrivning
id Ett unikt fråge-ID som finns kvar i omstarter.
runId Ett fråge-ID som är unikt för varje start/omstart. Se StreamingQuery.runId().
name Det användardefinierade namnet på frågan. Namnet är null om inget namn har angetts.
timestamp Tidsstämpeln för körningen av mikrobatchen.
batchId Ett unikt ID för den aktuella batchen med data som bearbetas. Vid återförsök efter ett fel kan ett visst batch-ID köras mer än en gång. På samma sätt ökar inte batch-ID:t när det inte finns några data som ska bearbetas.
numInputRows Det aggregerade antalet poster (över alla källor) som bearbetas i en utlösare.
inputRowsPerSecond Den aggregerade (över alla källor) frekvensen för ankommande data.
processedRowsPerSecond Den aggregerade (över alla källor) hastighet med vilken Spark bearbetar data.

durationMs-objekt

Information om den tid det tar att slutföra olika steg i mikrobatchkörningsprocessen.

Mätvärde Beskrivning
durationMs.addBatch Den tid det tar att köra mikrobatchen. Detta utesluter den tid spark tar att planera mikrobatchen.
durationMs.getBatch Den tid det tar att hämta metadata om förskjutningarna från källan.
durationMs.latestOffset Den senaste förskjutningen som förbrukas för mikrobatchen. Det här förloppsobjektet refererar till den tid det tar att hämta den senaste förskjutningen från källor.
durationMs.queryPlanning Den tid det tar att generera körningsplanen.
durationMs.triggerExecution Den tid det tar att planera och köra mikrobatchen.
durationMs.walCommit Den tid det tar att checka in de nya tillgängliga förskjutningarna.

eventTime-objekt

Information om händelsetidsvärdet som visas i de data som bearbetas i mikrobatchen. Dessa data används av vattenstämpeln för att ta reda på hur tillståndet kan trimmas för bearbetning av tillståndskänsliga aggregeringar som definierats i structured streaming-jobbet.

Mätvärde Beskrivning
eventTime.avg Den genomsnittliga händelsetiden som visas i utlösaren.
eventTime.max Den maximala händelsetiden som visas i utlösaren.
eventTime.min Den minsta händelsetid som visas i utlösaren.
eventTime.watermark Värdet för vattenstämpeln som används i utlösaren.

stateOperators-objekt

Information om tillståndskänsliga åtgärder som definieras i structured streaming-jobbet och de sammansättningar som skapas från dem.

Mätvärde Beskrivning
stateOperators.operatorName Namnet på den tillståndskänsliga operator som måtten relaterar till, till exempel symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Det totala antalet rader i tillstånd som ett resultat av en tillståndskänslig operator eller aggregering.
stateOperators.numRowsUpdated Det totala antalet rader som har uppdaterats i tillstånd till följd av en tillståndskänslig operator eller aggregering.
stateOperators.allUpdatesTimeMs Det här måttet kan för närvarande inte mätas av Spark och planeras att tas bort i framtida uppdateringar.
stateOperators.numRowsRemoved Det totala antalet rader som tagits bort från tillståndet till följd av en tillståndskänslig operator eller aggregering.
stateOperators.allRemovalsTimeMs Det här måttet kan för närvarande inte mätas av Spark och planeras att tas bort i framtida uppdateringar.
stateOperators.commitTimeMs Den tid det tar att checka in alla uppdateringar (placerar och tar bort) och returnerar en ny version.
stateOperators.memoryUsedBytes Minne som används av tillståndsarkivet.
stateOperators.numRowsDroppedByWatermark Antalet rader som anses vara för sena för att ingå i en tillståndskänslig aggregering. Endast strömmande aggregeringar: Antalet rader som släppts efter aggregering (inte råa indatarader). Det här talet är inte exakt, men ger en indikation på att sena data tas bort.
stateOperators.numShufflePartitions Antalet shuffle-partitioner för den här tillståndskänsliga operatorn.
stateOperators.numStateStoreInstances Den faktiska tillståndslagringsinstansen som operatorn har initierat och underhållit. För många tillståndskänsliga operatorer är detta samma som antalet partitioner. Stream-stream-kopplingar initierar dock fyra tillståndslagerinstanser per partition.

stateOperators.customMetrics-objekt

Information som samlas in från RocksDB som samlar in mått om dess prestanda och åtgärder med avseende på de tillståndskänsliga värden som den upprätthåller för structured streaming-jobbet. Mer information finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.

Mätvärde Beskrivning
customMetrics.rocksdbBytesCopied Antalet byte som kopierats enligt spårning av RocksDB-filhanteraren.
customMetrics.rocksdbCommitCheckpointLatency Tiden i millisekunder som tar en ögonblicksbild av inbyggda RocksDB och skriver den till en lokal katalog.
customMetrics.rocksdbCompactLatency Tiden i millisekunders komprimering (valfritt) under kontrollpunktsincheckningen.
customMetrics.rocksdbCommitFileSyncLatencyMs Tiden i millisekunder som synkroniserar den interna RocksDB-ögonblicksbilden till extern lagring (kontrollpunktsplatsen).
customMetrics.rocksdbCommitFlushLatency Tiden i millisekunder som rensar RocksDB-minnesinterna ändringar till den lokala disken.
customMetrics.rocksdbCommitPauseLatency Tiden i millisekunder som stoppar bakgrundsarbetstrådarna som en del av kontrollpunktsincheckningen, till exempel för komprimering.
customMetrics.rocksdbCommitWriteBatchLatency Tiden i millisekunder som tillämpar mellanlagrade skrivningar i minnesintern struktur (WriteBatch) på interna RocksDB.
customMetrics.rocksdbFilesCopied Antalet filer som kopierats som spårade av RocksDB-filhanteraren.
customMetrics.rocksdbFilesReused Antalet filer som återanvänds enligt spåras av RocksDB-filhanteraren.
customMetrics.rocksdbGetCount Antalet get anrop till databasen (inkluderar gets inte från WriteBatch - minnesintern batch som används för mellanlagring av skrivningar).
customMetrics.rocksdbGetLatency Den genomsnittliga tiden i nanosekunder för det underliggande interna RocksDB::Get anropet.
customMetrics.rocksdbReadBlockCacheHitCount Antalet cacheträffar från blockcacheminnet i RocksDB som är användbara för att undvika lokala diskläsningar.
customMetrics.rocksdbReadBlockCacheMissCount Antalet blockcacheminnen i RocksDB är inte användbart för att undvika lokala diskläsningar.
customMetrics.rocksdbSstFileSize Storleken på alla SST-filer (Static Sorted Table) – tabellstrukturen RocksDB använder för att lagra data.
customMetrics.rocksdbTotalBytesRead Antalet okomprimerade byte som lästs av get åtgärder.
customMetrics.rocksdbTotalBytesReadByCompaction Antalet byte som komprimeringsprocessen läser från disken.
customMetrics.rocksdbTotalBytesReadThroughIterator Det totala antalet byte av okomprimerade data som lästs med hjälp av en iterator. Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i FlatMapGroupsWithState och vattenstämpling) kräver läsning av data i DB via en iterator.
customMetrics.rocksdbTotalBytesWritten Det totala antalet okomprimerade byte som skrivits av put åtgärder.
customMetrics.rocksdbTotalBytesWrittenByCompaction Det totala antalet byte som komprimeringsprocessen skriver till disken.
customMetrics.rocksdbTotalCompactionLatencyMs Tiden i millisekunder för RocksDB-komprimering, inklusive bakgrundskomprimeringar och den valfria komprimering som initierades under incheckningen.
customMetrics.rocksdbTotalFlushLatencyMs Den totala tömningstiden, inklusive bakgrundsspolning. Tömningsåtgärder är processer som MemTable töms till lagring när den är full. MemTables är den första nivån där data lagras i RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Storleken i byte för de okomprimerade zip-filerna enligt filhanterarens rapporter. Filhanteraren hanterar den fysiska SST-filens diskutrymmesanvändning och borttagning.

källobjekt (Kafka)

Mätvärde Beskrivning
sources.description En detaljerad beskrivning av Kafka-källan som anger det exakta Kafka-ämnet som läse från. Exempel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset objekt Startförskjutningsnumret i kafka-ämnet där strömningsjobbet startade.
sources.endOffset objekt Den sista förskjutningen som bearbetas av mikrobatchen. Detta kan vara lika med latestOffset för en pågående mikrobatchkörning.
sources.latestOffset objekt Den senaste förskjutningen som räknas av mikrobatchen. Mikrobatching-processen kanske inte bearbetar alla förskjutningar när det finns begränsningar, vilket resulterar i endOffset och latestOffset differentiering.
sources.numInputRows Antalet indatarader som bearbetas från den här källan.
sources.inputRowsPerSecond Den hastighet med vilken data anländer för bearbetning från den här källan.
sources.processedRowsPerSecond Den hastighet med vilken Spark bearbetar data från den här källan.

sources.metrics-objekt (Kafka)

Mätvärde Beskrivning
sources.metrics.avgOffsetsBehindLatest Det genomsnittliga antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen.
sources.metrics.estimatedTotalBytesBehindLatest Det uppskattade antalet byte som frågeprocessen inte har förbrukat från de prenumererade ämnena.
sources.metrics.maxOffsetsBehindLatest Det maximala antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen.
sources.metrics.minOffsetsBehindLatest Det minsta antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumerationsämnen.

mottagarobjekt (Kafka)

Mätvärde Beskrivning
sink.description Beskrivningen av Kafka-mottagaren som strömningsfrågan skriver till, som beskriver den specifika Kafka-mottagarimplementeringen som används. Exempel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Antalet rader som skrivits till utdatatabellen eller mottagaren som en del av mikrobatchen. I vissa situationer kan det här värdet vara "-1" och kan i allmänhet tolkas som "okänt".

källobjekt (Delta Lake)

Mätvärde Beskrivning
sources.description Beskrivningen av källan som strömningsfrågan läser från. Exempel: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Den version av serialiseringen som förskjutningen kodas med.
sources.[startOffset/endOffset].reservoirId ID:t för tabellen som läss. Detta används för att identifiera felkonfiguration vid omstart av en fråga.
sources.[startOffset/endOffset].reservoirVersion Den version av tabellen som bearbetas för närvarande.
sources.[startOffset/endOffset].index Indexet i sekvensen AddFiles i den här versionen. Detta används för att dela upp stora incheckningar i flera batchar. Det här indexet skapas genom sortering på modificationTimestamp och path.
sources.[startOffset/endOffset].isStartingVersion Identifierar om aktuell förskjutning markerar början på en ny strömmande fråga i stället för bearbetningen av ändringar som inträffade efter att de första data bearbetades. När du startar en ny fråga bearbetas alla data som finns i tabellen i början först och sedan alla nya data som tas emot.
sources.latestOffset Den senaste förskjutningen som bearbetas av mikrobatchfrågan.
sources.numInputRows Antalet indatarader som bearbetas från den här källan.
sources.inputRowsPerSecond Den hastighet med vilken data anländer för bearbetning från den här källan.
sources.processedRowsPerSecond Den hastighet med vilken Spark bearbetar data från den här källan.
sources.metrics.numBytesOutstanding Den kombinerade storleken på de utestående filerna (filer som spåras av RocksDB). Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla.
sources.metrics.numFilesOutstanding Antalet utestående filer som ska bearbetas. Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla.

mottagarobjekt (Delta Lake)

Mätvärde Beskrivning
sink.description Beskrivningen av deltamottagaren, som beskriver den specifika deltamottagareimplementering som används. Exempel: “DeltaSink[table]”.
sink.numOutputRows Antalet rader är alltid "-1" eftersom Spark inte kan härleda utdatarader för DSv1-mottagare, vilket är klassificeringen för Delta Lake-mottagare.

Exempel

Exempel på Kafka-to-Kafka StreamingQueryListener-händelse

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exempel på Delta Lake-to-Delta Lake StreamingQueryListener-händelse

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exempel på Kinesis-to-Delta Lake StreamingQueryListener-händelse

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Exempel på Kafka+Delta Lake-to-Delta Lake StreamingQueryListener-händelse

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Exempel på frekvenskälla till Delta Lake StreamingQueryListener-händelse

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}