Surveillance des requêtes de Structured Streaming sur Azure Databricks

Azure Databricks fournit une surveillance intégrée pour les applications de Structured Streaming via l’interface utilisateur de Spark sous l’onglet Streaming.

Distinguer les requêtes de Structured Streaming dans l’interface utilisateur de Spark

Donnez à vos flux un nom de requête unique en ajoutant .queryName(<query-name>) à votre code writeStream pour distinguer facilement quelles métriques appartiennent à quel flux dans l’interface utilisateur de Spark.

Envoyer (push) des métriques de Structured Streaming à des services externes

Les métriques de streaming peuvent être envoyées (push) vers des services externes pour une utilisation dans les alertes ou les tableaux de bord, avec l’interface de l’écouteur de requête de streaming d’Apache Spark. Dans Databricks Runtime 11.3 LTS et versions ultérieures, l’écouteur de requête de streaming est disponible en Python et Scala.

Important

Les informations d’identification et les objets gérés par Unity Catalog ne peuvent pas être utilisés dans la logique StreamingQueryListener.

Remarque

La latence de traitement avec les écouteurs peut affecter considérablement les vitesses de traitement des requêtes. Il est conseillé de limiter la logique de traitement dans ces écouteurs et d’opter pour l’écriture dans des systèmes de réponse rapide comme Kafka pour une efficacité.

Le code suivant fournit des exemples simples de la syntaxe à utiliser pour implémenter un écouteur :

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Définition de métriques observables dans Structured Streaming

Les métriques observables sont des fonctions d’agrégation arbitraires nommées qui peuvent être définies sur une requête (DataFrame). Dès que l’exécution d’un DataFrame atteint un point d’achèvement (c’est-à-dire termine une requête par lot ou atteint une époque de streaming), un événement nommé est émis, qui contient les métriques pour les données traitées depuis le dernier point d’achèvement.

Vous pouvez observer ces métriques en attachant un écouteur à la session Spark. L’écouteur dépend du mode d’exécution :

  • Mode par lot : Utilisez QueryExecutionListener.

    QueryExecutionListener est appelé quand la requête est terminée. Accédez aux métriques en utilisant la carte QueryExecution.observedMetrics.

  • Diffusion en continu ou microbatch : Utiliser StreamingQueryListener.

    StreamingQueryListener est appelé quand la requête de streaming termine une époque. Accédez aux métriques en utilisant la carte StreamingQueryProgress.observedMetrics. Azure Databricks ne prend pas en charge le streaming d’exécution continue.

Par exemple :

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Métriques d’objet StreamingQueryListener

Métrique Description
id ID de requête unique qui persiste entre les redémarrages.
runId ID de requête unique pour chaque démarrage/redémarrage. Voir StreamingQuery.runId().
name Nom spécifié par l’utilisateur de la requête. Le nom est nul si aucun nom n’est spécifié.
timestamp Horodatage pour l’exécution du microbatch.
batchId ID unique pour le lot actuel de données en cours de traitement. Dans le cas de nouvelles tentatives après un échec, un ID de lot donné peut être exécuté plusieurs fois. De même, lorsqu’il n’y a pas de données à traiter, l’ID de lot n’est pas incrémenté.
numInputRows Nombre d’enregistrements agrégés (sur toutes les sources) traités dans un déclencheur.
inputRowsPerSecond Taux d’agrégation (sur toutes les sources) des données arrivantes.
processedRowsPerSecond Taux d’agrégation (sur toutes les sources) auquel Spark traite les données.

Objet durationMs

Informations sur le temps nécessaire pour effectuer différentes étapes du processus d’exécution de microbatch.

Métrique Description
durationMs.addBatch Temps nécessaire pour exécuter le microbatch. Cela exclut le temps nécessaire à Spark pour planifier le micro-lot.
durationMs.getBatch Temps nécessaire pour récupérer les métadonnées sur les décalages de la source.
durationMs.latestOffset Décalage le plus récent consommé pour le microbatch. Cet objet de progression fait référence au temps nécessaire pour récupérer le dernier décalage à partir des sources.
durationMs.queryPlanning Temps nécessaire pour générer le plan d’exécution.
durationMs.triggerExecution Temps nécessaire pour planifier et exécuter le microbatch.
durationMs.walCommit Temps nécessaire pour valider les nouveaux décalages disponibles.

Objet eventTime

Informations sur la valeur d’heure de l’événement observée dans les données traitées dans le microbatch. Le filigrane utilise ces données pour déterminer comment découper l’état pour le traitement des agrégations avec état qui sont définies dans le travail de Structured Streaming.

Métrique Description
eventTime.avg Temps moyen d’événement observé dans ce déclencheur.
eventTime.max Durée maximale de l’événement observée dans ce déclencheur.
eventTime.min Durée minimale de l’événement observée dans ce déclencheur.
eventTime.watermark Valeur du filigrane utilisé dans ce déclencheur.

Objet stateOperators

Informations sur les opérations avec état définies dans le travail Structured Streaming et les agrégations produites à partir de celles-ci.

Métrique Description
stateOperators.operatorName Nom de l’opérateur avec état auquel les métriques sont liées, telles que symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Nombre total de lignes dans l’état à la suite d’un opérateur ou d’une agrégation avec état.
stateOperators.numRowsUpdated Nombre total de lignes mises à jour dans l’état à la suite d’un opérateur ou d’une agrégation avec état.
stateOperators.allUpdatesTimeMs Cette métrique n’est actuellement pas mesurable par Spark et est prévue pour être supprimée dans les prochaines mises à jour.
stateOperators.numRowsRemoved Nombre total de lignes supprimées de l’état à la suite d’un opérateur ou d’une agrégation avec état.
stateOperators.allRemovalsTimeMs Cette métrique n’est actuellement pas mesurable par Spark et est prévue pour être supprimée dans les prochaines mises à jour.
stateOperators.commitTimeMs Temps nécessaire pour valider toutes les mises à jour (place et suppressions) et retourner une nouvelle version.
stateOperators.memoryUsedBytes Mémoire utilisée par le magasin d’état.
stateOperators.numRowsDroppedByWatermark Nombre de lignes considérées comme trop tardives à inclure dans une agrégation avec état. Agrégations de diffusion en continu uniquement : nombre de lignes supprimées après l’agrégation (pas de lignes d’entrée brutes). Ce nombre n’est pas précis, mais indique qu’il existe des données tardives supprimées.
stateOperators.numShufflePartitions Nombre de partitions aléatoires pour cet opérateur avec état.
stateOperators.numStateStoreInstances Instance réelle du magasin d’états que l’opérateur a initialisée et conservée. Pour de nombreux opérateurs avec état, il s’agit du même que le nombre de partitions. Toutefois, les jointures stream-stream initialisent quatre instances de magasin d’état par partition.

Objet stateOperators.customMetrics

Informations collectées à partir de la capture de métriques RocksDB sur ses performances et ses opérations en ce qui concerne les valeurs avec état qu’elle gère pour le travail Structured Streaming. Pour plus d’informations, consultez Configurer le magasin d’état RocksDB sur Azure Databricks.

Métrique Description
customMetrics.rocksdbBytesCopied Le nombre d’octets copiés comme suivi par le gestionnaire de fichiers RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Le temps en millisecondes prenant un instantané de RocksDB natif et l’écrivez dans un répertoire local.
customMetrics.rocksdbCompactLatency Le temps de compactage en millisecondes (facultatif) pendant la validation du point de contrôle.
customMetrics.rocksdbCommitFileSyncLatencyMs Le temps en millisecondes de synchronisation de l’instantané RocksDB natif vers le stockage externe (emplacement de point de contrôle).
customMetrics.rocksdbCommitFlushLatency Le temps en millisecondes de vidage des modifications en mémoire de la base de données RocksDB sur le disque local.
customMetrics.rocksdbCommitPauseLatency La durée en millisecondes d’arrêt des threads de travail en arrière-plan dans le cadre de la validation de point de contrôle, par exemple pour le compactage.
customMetrics.rocksdbCommitWriteBatchLatency Le temps en millisecondes appliquant les écritures intermédiaires dans la structure en mémoire (WriteBatch) dans la base de données rocksDB native.
customMetrics.rocksdbFilesCopied Le nombre de fichiers copiés comme suivi par le gestionnaire de fichiers RocksDB.
customMetrics.rocksdbFilesReused Le nombre de fichiers réutilisés d’après le suivi du gestionnaire de fichiers RocksDB.
customMetrics.rocksdbGetCount Le nombre d’appels get à la base de données (n’inclut pas gets de WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires).
customMetrics.rocksdbGetLatency Le délai moyen en nanosecondes pour l’appel natif RocksDB::Get sous-jacent.
customMetrics.rocksdbReadBlockCacheHitCount Le nombre d’accès au cache à partir du cache de blocs dans RocksDB qui sont utiles pour éviter les lectures de disque local.
customMetrics.rocksdbReadBlockCacheMissCount Le nombre de caches de blocs dans RocksDB n’est pas utile pour éviter les lectures de disque local.
customMetrics.rocksdbSstFileSize La taille de tous les fichiers SST (Static Trid Table) : la structure tabulaire RocksDB utilise pour stocker des données.
customMetrics.rocksdbTotalBytesRead Le nombre d’octets non compressés lus par les opérations get.
customMetrics.rocksdbTotalBytesReadByCompaction Nombre d’octets lus par le processus de compactage à partir du disque.
customMetrics.rocksdbTotalBytesReadThroughIterator Le nombre total d’octets de données non compressées lues à l’aide d’un itérateur. Certaines opérations avec état (par exemple, le traitement du délai d’attente dans FlatMapGroupsWithState et le filigrane) nécessitent la lecture de données dans la base de données via un itérateur.
customMetrics.rocksdbTotalBytesWritten Le nombre total d’octets non compressés écrits par les opérations put.
customMetrics.rocksdbTotalBytesWrittenByCompaction Le nombre total d’octets que le processus de compactage écrit sur le disque.
customMetrics.rocksdbTotalCompactionLatencyMs Le temps nécessaire pour les compactages par RocksDB, y compris les compactages d’arrière-plan et le compactage facultatif lancé pendant la validation. (en millisecondes)
customMetrics.rocksdbTotalFlushLatencyMs Le temps de vidage, y compris le vidage en arrière-plan. Les opérations de vidage sont des processus par lesquels le MemTable est vidé vers le stockage une fois qu’il’est plein. MemTables sont le premier niveau où les données sont stockées dans RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed La taille en octets des fichiers zip non compressés, comme indiqué par le gestionnaire de fichiers. Le gestionnaire de fichiers gère l’utilisation et la suppression de l’espace disque du fichier SST physiques.

Objet sources (Kafka)

Métrique Description
sources.description Une description détaillée de la source Kafka, en spécifiant la rubrique Kafka exacte à partir de laquelle il est lu. Par exemple : “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
l'objet sources.startOffset Le numéro de décalage de départ dans la rubrique Kafka à laquelle la tâche de diffusion en continu a démarré.
l'objet sources.endOffset Le dernier décalage traité par le microbatch. Cela peut être identique à latestOffset pour une exécution de micro-lot en cours.
l'objet sources.latestOffset Le dernier décalage calculé par le microbatch. Le processus de microbatching peut ne pas traiter tous les décalages en cas de limitation, ce qui entraîne une différenciation entre endOffset et latestOffset.
sources.numInputRows Le nombre de lignes d’entrée traitées à partir de cette source.
sources.inputRowsPerSecond Le taux auquel les données arrivent pour le traitement à partir de cette source.
sources.processedRowsPerSecond Le taux auquel Spark traite les données de cette source.

Objet sources.metrics (Kafka)

Métrique Description
sources.metrics.avgOffsetsBehindLatest Le nombre moyen de décalages entre la requête de streaming et le dernier décalage disponible, parmi toutes les rubriques souscrites.
sources.metrics.estimatedTotalBytesBehindLatest Le nombre estimé d’octets que le processus de requête n’a pas consommés à partir des rubriques souscrites.
sources.metrics.maxOffsetsBehindLatest Le nombre maximal de décalages entre la requête de streaming et le dernier décalage disponible, parmi toutes les rubriques souscrites.
sources.metrics.minOffsetsBehindLatest Le nombre minimal de décalages entre la requête de streaming et le dernier décalage disponible parmi toutes les rubriques souscrites.

Objet sink (Kafka)

Métrique Description
sink.description La description du récepteur Kafka dans lequel la requête de diffusion en continu écrit, détaillant l’implémentation spécifique du récepteur Kafka utilisée. Par exemple : “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Le nombre de lignes écrites dans la table de sortie ou le récepteur, dans le cadre du micro-lot. Dans certaines situations, cette valeur peut être « -1 » et peut généralement être interprétée comme « inconnue ».

Objet sources (Delta Lake)

Métrique Description
sources.description La description de la source à partir de laquelle la requête de streaming lit. Par exemple : “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion La version de sérialisation avec laquelle ce décalage est encodé.
sources.[startOffset/endOffset].reservoirId L’ID de la table en cours de lecture. Cela permet de détecter une configuration incorrecte lors du redémarrage d’une requête.
sources.[startOffset/endOffset].reservoirVersion La version de la table en cours de traitement.
sources.[startOffset/endOffset].index L’index dans la séquence de AddFiles dans cette version. Cela permet de décomposer les validations volumineuses en plusieurs lots. Cet index est créé en triant à l’aide de modificationTimestamp et path.
sources.[startOffset/endOffset].isStartingVersion Identifie si le décalage actuel marque le début d’une nouvelle requête de streaming plutôt que le traitement des modifications qui se sont produites après le traitement des données initiales. Lors du démarrage d’une nouvelle requête, toutes les données présentes dans la table au début sont traitées en premier, puis toutes les nouvelles données qui arrivent.
sources.latestOffset Le décalage le plus récent traité par la requête de micro-lot.
sources.numInputRows Le nombre de lignes d’entrée traitées à partir de cette source.
sources.inputRowsPerSecond Le taux auquel les données arrivent pour le traitement à partir de cette source.
sources.processedRowsPerSecond Le taux auquel Spark traite les données de cette source.
sources.metrics.numBytesOutstanding La taille combinée des fichiers en attente (fichiers suivis par RocksDB). Il s’agit de la métrique du backlog lorsque Delta et Auto Loader sont la source de streaming.
sources.metrics.numFilesOutstanding Le nombre de fichiers en attente à traiter. Il s’agit de la métrique du backlog lorsque Delta et Auto Loader sont la source de streaming.

Objet sink (Delta Lake)

Métrique Description
sink.description La description du récepteur Delta, détaillant l’implémentation spécifique du récepteur Delta utilisée. Par exemple : “DeltaSink[table]”.
sink.numOutputRows Le nombre de lignes est toujours « -1 », car Spark ne peut pas déduire les lignes de sortie pour les récepteurs DSv1, qui est la classification du récepteur Delta Lake.

Exemples

Exemple d’événement Kafka-to-Kafka avec StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemple d’événement Delta Lake-to-Delta Lake avec StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exemple d’événement Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Exemple d’événement Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Exemple de mesures pour un événement source vers Delta Lake avec StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}