Surveillance des requêtes de Structured Streaming sur Azure Databricks
Azure Databricks fournit une surveillance intégrée pour les applications de Structured Streaming via l’interface utilisateur de Spark sous l’onglet Streaming.
Distinguer les requêtes de Structured Streaming dans l’interface utilisateur de Spark
Donnez à vos flux un nom de requête unique en ajoutant .queryName(<query-name>)
à votre code writeStream
pour distinguer facilement quelles métriques appartiennent à quel flux dans l’interface utilisateur de Spark.
Envoyer (push) des métriques de Structured Streaming à des services externes
Les métriques de streaming peuvent être envoyées (push) vers des services externes pour une utilisation dans les alertes ou les tableaux de bord, avec l’interface de l’écouteur de requête de streaming d’Apache Spark. Dans Databricks Runtime 11.3 LTS et versions ultérieures, l’écouteur de requête de streaming est disponible en Python et Scala.
Important
Les informations d’identification et les objets gérés par Unity Catalog ne peuvent pas être utilisés dans la logique StreamingQueryListener
.
Remarque
La latence de traitement avec les écouteurs peut affecter considérablement les vitesses de traitement des requêtes. Il est conseillé de limiter la logique de traitement dans ces écouteurs et d’opter pour l’écriture dans des systèmes de réponse rapide comme Kafka pour une efficacité.
Le code suivant fournit des exemples simples de la syntaxe à utiliser pour implémenter un écouteur :
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Définition de métriques observables dans Structured Streaming
Les métriques observables sont des fonctions d’agrégation arbitraires nommées qui peuvent être définies sur une requête (DataFrame). Dès que l’exécution d’un DataFrame atteint un point d’achèvement (c’est-à-dire termine une requête par lot ou atteint une époque de streaming), un événement nommé est émis, qui contient les métriques pour les données traitées depuis le dernier point d’achèvement.
Vous pouvez observer ces métriques en attachant un écouteur à la session Spark. L’écouteur dépend du mode d’exécution :
Mode par lot : Utilisez
QueryExecutionListener
.QueryExecutionListener
est appelé quand la requête est terminée. Accédez aux métriques en utilisant la carteQueryExecution.observedMetrics
.Diffusion en continu ou microbatch : Utiliser
StreamingQueryListener
.StreamingQueryListener
est appelé quand la requête de streaming termine une époque. Accédez aux métriques en utilisant la carteStreamingQueryProgress.observedMetrics
. Azure Databricks ne prend pas en charge le streaming d’exécution continue.
Par exemple :
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Métriques d’objet StreamingQueryListener
Métrique | Description |
---|---|
id |
ID de requête unique qui persiste entre les redémarrages. |
runId |
ID de requête unique pour chaque démarrage/redémarrage. Voir StreamingQuery.runId(). |
name |
Nom spécifié par l’utilisateur de la requête. Le nom est nul si aucun nom n’est spécifié. |
timestamp |
Horodatage pour l’exécution du microbatch. |
batchId |
ID unique pour le lot actuel de données en cours de traitement. Dans le cas de nouvelles tentatives après un échec, un ID de lot donné peut être exécuté plusieurs fois. De même, lorsqu’il n’y a pas de données à traiter, l’ID de lot n’est pas incrémenté. |
numInputRows |
Nombre d’enregistrements agrégés (sur toutes les sources) traités dans un déclencheur. |
inputRowsPerSecond |
Taux d’agrégation (sur toutes les sources) des données arrivantes. |
processedRowsPerSecond |
Taux d’agrégation (sur toutes les sources) auquel Spark traite les données. |
Objet durationMs
Informations sur le temps nécessaire pour effectuer différentes étapes du processus d’exécution de microbatch.
Métrique | Description |
---|---|
durationMs.addBatch |
Temps nécessaire pour exécuter le microbatch. Cela exclut le temps nécessaire à Spark pour planifier le micro-lot. |
durationMs.getBatch |
Temps nécessaire pour récupérer les métadonnées sur les décalages de la source. |
durationMs.latestOffset |
Décalage le plus récent consommé pour le microbatch. Cet objet de progression fait référence au temps nécessaire pour récupérer le dernier décalage à partir des sources. |
durationMs.queryPlanning |
Temps nécessaire pour générer le plan d’exécution. |
durationMs.triggerExecution |
Temps nécessaire pour planifier et exécuter le microbatch. |
durationMs.walCommit |
Temps nécessaire pour valider les nouveaux décalages disponibles. |
Objet eventTime
Informations sur la valeur d’heure de l’événement observée dans les données traitées dans le microbatch. Le filigrane utilise ces données pour déterminer comment découper l’état pour le traitement des agrégations avec état qui sont définies dans le travail de Structured Streaming.
Métrique | Description |
---|---|
eventTime.avg |
Temps moyen d’événement observé dans ce déclencheur. |
eventTime.max |
Durée maximale de l’événement observée dans ce déclencheur. |
eventTime.min |
Durée minimale de l’événement observée dans ce déclencheur. |
eventTime.watermark |
Valeur du filigrane utilisé dans ce déclencheur. |
Objet stateOperators
Informations sur les opérations avec état définies dans le travail Structured Streaming et les agrégations produites à partir de celles-ci.
Métrique | Description |
---|---|
stateOperators.operatorName |
Nom de l’opérateur avec état auquel les métriques sont liées, telles que symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Nombre total de lignes dans l’état à la suite d’un opérateur ou d’une agrégation avec état. |
stateOperators.numRowsUpdated |
Nombre total de lignes mises à jour dans l’état à la suite d’un opérateur ou d’une agrégation avec état. |
stateOperators.allUpdatesTimeMs |
Cette métrique n’est actuellement pas mesurable par Spark et est prévue pour être supprimée dans les prochaines mises à jour. |
stateOperators.numRowsRemoved |
Nombre total de lignes supprimées de l’état à la suite d’un opérateur ou d’une agrégation avec état. |
stateOperators.allRemovalsTimeMs |
Cette métrique n’est actuellement pas mesurable par Spark et est prévue pour être supprimée dans les prochaines mises à jour. |
stateOperators.commitTimeMs |
Temps nécessaire pour valider toutes les mises à jour (place et suppressions) et retourner une nouvelle version. |
stateOperators.memoryUsedBytes |
Mémoire utilisée par le magasin d’état. |
stateOperators.numRowsDroppedByWatermark |
Nombre de lignes considérées comme trop tardives à inclure dans une agrégation avec état. Agrégations de diffusion en continu uniquement : nombre de lignes supprimées après l’agrégation (pas de lignes d’entrée brutes). Ce nombre n’est pas précis, mais indique qu’il existe des données tardives supprimées. |
stateOperators.numShufflePartitions |
Nombre de partitions aléatoires pour cet opérateur avec état. |
stateOperators.numStateStoreInstances |
Instance réelle du magasin d’états que l’opérateur a initialisée et conservée. Pour de nombreux opérateurs avec état, il s’agit du même que le nombre de partitions. Toutefois, les jointures stream-stream initialisent quatre instances de magasin d’état par partition. |
Objet stateOperators.customMetrics
Informations collectées à partir de la capture de métriques RocksDB sur ses performances et ses opérations en ce qui concerne les valeurs avec état qu’elle gère pour le travail Structured Streaming. Pour plus d’informations, consultez Configurer le magasin d’état RocksDB sur Azure Databricks.
Métrique | Description |
---|---|
customMetrics.rocksdbBytesCopied |
Le nombre d’octets copiés comme suivi par le gestionnaire de fichiers RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Le temps en millisecondes prenant un instantané de RocksDB natif et l’écrivez dans un répertoire local. |
customMetrics.rocksdbCompactLatency |
Le temps de compactage en millisecondes (facultatif) pendant la validation du point de contrôle. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Le temps en millisecondes de synchronisation de l’instantané RocksDB natif vers le stockage externe (emplacement de point de contrôle). |
customMetrics.rocksdbCommitFlushLatency |
Le temps en millisecondes de vidage des modifications en mémoire de la base de données RocksDB sur le disque local. |
customMetrics.rocksdbCommitPauseLatency |
La durée en millisecondes d’arrêt des threads de travail en arrière-plan dans le cadre de la validation de point de contrôle, par exemple pour le compactage. |
customMetrics.rocksdbCommitWriteBatchLatency |
Le temps en millisecondes appliquant les écritures intermédiaires dans la structure en mémoire (WriteBatch ) dans la base de données rocksDB native. |
customMetrics.rocksdbFilesCopied |
Le nombre de fichiers copiés comme suivi par le gestionnaire de fichiers RocksDB. |
customMetrics.rocksdbFilesReused |
Le nombre de fichiers réutilisés d’après le suivi du gestionnaire de fichiers RocksDB. |
customMetrics.rocksdbGetCount |
Le nombre d’appels get à la base de données (n’inclut pas gets de WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires). |
customMetrics.rocksdbGetLatency |
Le délai moyen en nanosecondes pour l’appel natif RocksDB::Get sous-jacent. |
customMetrics.rocksdbReadBlockCacheHitCount |
Le nombre d’accès au cache à partir du cache de blocs dans RocksDB qui sont utiles pour éviter les lectures de disque local. |
customMetrics.rocksdbReadBlockCacheMissCount |
Le nombre de caches de blocs dans RocksDB n’est pas utile pour éviter les lectures de disque local. |
customMetrics.rocksdbSstFileSize |
La taille de tous les fichiers SST (Static Trid Table) : la structure tabulaire RocksDB utilise pour stocker des données. |
customMetrics.rocksdbTotalBytesRead |
Le nombre d’octets non compressés lus par les opérations get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
Nombre d’octets lus par le processus de compactage à partir du disque. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Le nombre total d’octets de données non compressées lues à l’aide d’un itérateur. Certaines opérations avec état (par exemple, le traitement du délai d’attente dans FlatMapGroupsWithState et le filigrane) nécessitent la lecture de données dans la base de données via un itérateur. |
customMetrics.rocksdbTotalBytesWritten |
Le nombre total d’octets non compressés écrits par les opérations put . |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Le nombre total d’octets que le processus de compactage écrit sur le disque. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Le temps nécessaire pour les compactages par RocksDB, y compris les compactages d’arrière-plan et le compactage facultatif lancé pendant la validation. (en millisecondes) |
customMetrics.rocksdbTotalFlushLatencyMs |
Le temps de vidage, y compris le vidage en arrière-plan. Les opérations de vidage sont des processus par lesquels le MemTable est vidé vers le stockage une fois qu’il’est plein. MemTables sont le premier niveau où les données sont stockées dans RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
La taille en octets des fichiers zip non compressés, comme indiqué par le gestionnaire de fichiers. Le gestionnaire de fichiers gère l’utilisation et la suppression de l’espace disque du fichier SST physiques. |
Objet sources (Kafka)
Métrique | Description |
---|---|
sources.description |
Une description détaillée de la source Kafka, en spécifiant la rubrique Kafka exacte à partir de laquelle il est lu. Par exemple : “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
l'objet sources.startOffset |
Le numéro de décalage de départ dans la rubrique Kafka à laquelle la tâche de diffusion en continu a démarré. |
l'objet sources.endOffset |
Le dernier décalage traité par le microbatch. Cela peut être identique à latestOffset pour une exécution de micro-lot en cours. |
l'objet sources.latestOffset |
Le dernier décalage calculé par le microbatch. Le processus de microbatching peut ne pas traiter tous les décalages en cas de limitation, ce qui entraîne une différenciation entre endOffset et latestOffset . |
sources.numInputRows |
Le nombre de lignes d’entrée traitées à partir de cette source. |
sources.inputRowsPerSecond |
Le taux auquel les données arrivent pour le traitement à partir de cette source. |
sources.processedRowsPerSecond |
Le taux auquel Spark traite les données de cette source. |
Objet sources.metrics (Kafka)
Métrique | Description |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Le nombre moyen de décalages entre la requête de streaming et le dernier décalage disponible, parmi toutes les rubriques souscrites. |
sources.metrics.estimatedTotalBytesBehindLatest |
Le nombre estimé d’octets que le processus de requête n’a pas consommés à partir des rubriques souscrites. |
sources.metrics.maxOffsetsBehindLatest |
Le nombre maximal de décalages entre la requête de streaming et le dernier décalage disponible, parmi toutes les rubriques souscrites. |
sources.metrics.minOffsetsBehindLatest |
Le nombre minimal de décalages entre la requête de streaming et le dernier décalage disponible parmi toutes les rubriques souscrites. |
Objet sink (Kafka)
Métrique | Description |
---|---|
sink.description |
La description du récepteur Kafka dans lequel la requête de diffusion en continu écrit, détaillant l’implémentation spécifique du récepteur Kafka utilisée. Par exemple : “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Le nombre de lignes écrites dans la table de sortie ou le récepteur, dans le cadre du micro-lot. Dans certaines situations, cette valeur peut être « -1 » et peut généralement être interprétée comme « inconnue ». |
Objet sources (Delta Lake)
Métrique | Description |
---|---|
sources.description |
La description de la source à partir de laquelle la requête de streaming lit. Par exemple : “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
La version de sérialisation avec laquelle ce décalage est encodé. |
sources.[startOffset/endOffset].reservoirId |
L’ID de la table en cours de lecture. Cela permet de détecter une configuration incorrecte lors du redémarrage d’une requête. |
sources.[startOffset/endOffset].reservoirVersion |
La version de la table en cours de traitement. |
sources.[startOffset/endOffset].index |
L’index dans la séquence de AddFiles dans cette version. Cela permet de décomposer les validations volumineuses en plusieurs lots. Cet index est créé en triant à l’aide de modificationTimestamp et path . |
sources.[startOffset/endOffset].isStartingVersion |
Identifie si le décalage actuel marque le début d’une nouvelle requête de streaming plutôt que le traitement des modifications qui se sont produites après le traitement des données initiales. Lors du démarrage d’une nouvelle requête, toutes les données présentes dans la table au début sont traitées en premier, puis toutes les nouvelles données qui arrivent. |
sources.latestOffset |
Le décalage le plus récent traité par la requête de micro-lot. |
sources.numInputRows |
Le nombre de lignes d’entrée traitées à partir de cette source. |
sources.inputRowsPerSecond |
Le taux auquel les données arrivent pour le traitement à partir de cette source. |
sources.processedRowsPerSecond |
Le taux auquel Spark traite les données de cette source. |
sources.metrics.numBytesOutstanding |
La taille combinée des fichiers en attente (fichiers suivis par RocksDB). Il s’agit de la métrique du backlog lorsque Delta et Auto Loader sont la source de streaming. |
sources.metrics.numFilesOutstanding |
Le nombre de fichiers en attente à traiter. Il s’agit de la métrique du backlog lorsque Delta et Auto Loader sont la source de streaming. |
Objet sink (Delta Lake)
Métrique | Description |
---|---|
sink.description |
La description du récepteur Delta, détaillant l’implémentation spécifique du récepteur Delta utilisée. Par exemple : “DeltaSink[table]” . |
sink.numOutputRows |
Le nombre de lignes est toujours « -1 », car Spark ne peut pas déduire les lignes de sortie pour les récepteurs DSv1, qui est la classification du récepteur Delta Lake. |
Exemples
Exemple d’événement Kafka-to-Kafka avec StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exemple d’événement Delta Lake-to-Delta Lake avec StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Exemple d’événement Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Exemple d’événement Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Exemple de mesures pour un événement source vers Delta Lake avec StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}