Monitoramento de consultas de Streaming Estruturado no Azure Databricks

O Azure Databricks fornece monitoramento interno para aplicativos de Fluxo Estruturado por meio da interface do usuário do Spark na guia Fluxo.

Distinguir consultas de Streaming Estruturado na interface do usuário do Spark

Forneça aos fluxos um nome de consulta exclusivo adicionando .queryName(<query-name>) ao código writeStream para distinguir facilmente quais métricas pertencem a qual fluxo na interface do usuário do Spark.

Enviar métricas de Streaming Estruturado por push para serviços externos

As métricas de streaming podem ser enviadas por push para serviços externos a fim de gerar alertas ou criar painéis de casos de uso com a interface do Ouvinte de Consulta de Streaming do Apache Spark. No Databricks Runtime 11.3 LTS e superior, o Streaming Query Listener está disponível em Python e Scala.

Importante

Credenciais e objetos gerenciados pelo Catálogo do Unity não podem ser usados na lógica StreamingQueryListener.

Observação

A latência de processamento com ouvintes pode afetar significativamente as velocidades de processamento de consultas. Recomenda-se limitar a lógica de processamento nesses ouvintes e optar por gravações em sistemas de resposta rápida como o Kafka para obter maior eficiência.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definindo métricas observáveis no Streaming Estruturado

As métricas observáveis são chamadas de funções de agregação arbitrárias e podem ser definidas em uma consulta (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, conclui uma consulta em lote ou atinge uma época de streaming), um evento nomeado é emitido contendo as métricas para os dados processados desde o último ponto de conclusão.

É possível observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • Modo de lote: use QueryExecutionListener.

    QueryExecutionListener é chamado quando a consulta é concluída. Acesse as métricas usando o mapa QueryExecution.observedMetrics.

  • Streaming ou microlote: use StreamingQueryListener.

    StreamingQueryListener é chamado quando a consulta de streaming conclui uma época. Acesse as métricas usando o mapa StreamingQueryProgress.observedMetrics. O Azure Databricks não dá suporte ao streaming de execução contínua.

Por exemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

métricas do objeto StreamingQueryListener

Métrica Descrição
id Uma ID de consulta exclusiva que persiste após reinicializações.
runId Uma ID de consulta exclusiva para cada inicialização/reinicialização. Consulte StreamingQuery.runId().
name O nome da consulta especificado pelo usuário. O nome será nulo se nenhum nome for especificado.
timestamp O carimbo de data/hora da execução do microlote.
batchId Uma ID exclusiva para o lote de dados atual que está sendo processado. No caso de novas tentativas após uma falha, uma determinada ID de lote pode ser executada mais de uma vez. De modo semelhante, quando não existem dados a serem processados, a ID do lote não é incrementada.
numInputRows O número agregado (em todas as fontes) de registros processados ​​em um gatilho.
inputRowsPerSecond A taxa agregada (em todas as fontes) de dados recebidos.
processedRowsPerSecond A taxa agregada (em todas as fontes) na qual o Spark está processando dados.

objeto durationMs

Informações sobre o tempo necessário para concluir diversos estágios do processo de execução do microlote.

Métrica Descrição
durationMs.addBatch O tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote.
durationMs.getBatch O tempo necessário para recuperar os metadados sobre os deslocamentos da fonte.
durationMs.latestOffset O último deslocamento consumido para o microlote. Esse objeto de progresso se refere ao tempo necessário para recuperar o deslocamento mais recente das fontes.
durationMs.queryPlanning O tempo necessário para gerar o plano de execução.
durationMs.triggerExecution O tempo necessário para planejar e executar o microlote.
durationMs.walCommit O tempo necessário para fazer commit dos novos deslocamentos disponíveis.

objeto eventTime

Informações sobre o valor de tempo do evento observado nos dados que estão sendo processados ​​no microlote. Esses dados são utilizados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no trabalho de Fluxo Estruturado.

Métrica Descrição
eventTime.avg O tempo médio do evento observado no gatilho.
eventTime.max O tempo máximo do evento observado no gatilho.
eventTime.min O tempo mínimo do evento observado no gatilho.
eventTime.watermark O valor da marca d'água usada no gatilho.

objeto stateOperators

Informações sobre as operações com estado definidas no trabalho de Fluxo Estruturado e as agregações que são produzidas a partir delas.

Métrica Descrição
stateOperators.operatorName O nome do operador com estado ao qual as métricas se relacionam, como symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal O número total de linhas no estado como resultado de um operador ou uma agregação com estado.
stateOperators.numRowsUpdated O número total de linhas atualizadas no estado como resultado de um operador ou uma agregação com estado.
stateOperators.allUpdatesTimeMs No momento, essa métrica não pode ser medida pelo Spark e sua remoção está prevista em atualizações futuras.
stateOperators.numRowsRemoved O número total de linhas removidas do estado como resultado de um operador ou uma agregação com estado.
stateOperators.allRemovalsTimeMs No momento, essa métrica não pode ser medida pelo Spark e sua remoção está prevista em atualizações futuras.
stateOperators.commitTimeMs O tempo necessário para fazer commit de todas as atualizações (colocações e remoções) e retornar uma nova versão.
stateOperators.memoryUsedBytes Memória utilizada pelo repositório de estado.
stateOperators.numRowsDroppedByWatermark O número de linhas que são consideradas muito atrasadas para serem incluídas em uma agregação com estado. Somente agregações de streaming: o número de linhas descartadas após a agregação (exceto linhas de entrada brutas). Esse número não é preciso, mas fornece uma indicação de que há dados atrasados ​​sendo descartados.
stateOperators.numShufflePartitions O número de partições aleatórias para este operador com estado.
stateOperators.numStateStoreInstances A instância de armazenamento de estado real que o operador inicializou e manteve. Para muitos operadores com estado, é igual ao número de partições. No entanto, junções stream-stream inicializam quatro instâncias de armazenamento de estado por partição.

Objeto stateOperators.customMetrics

Informações coletadas do RocksDB quando ele captura métricas sobre o desempenho e as operações relacionadas aos valores com estado que ele mantêm para o trabalho de streaming estruturado. Para mais informações, confira Configurar o repositório de estado do RocksDB no Azure Databricks.

Métrica Descrição
customMetrics.rocksdbBytesCopied O número de bytes copiados, conforme rastreado pelo gerenciador de arquivos do RocksDB.
customMetrics.rocksdbCommitCheckpointLatency O tempo em milissegundos para criar um instantâneo do RocksDB nativo e gravá-lo em um diretório local.
customMetrics.rocksdbCompactLatency O tempo em milissegundos para a compactação (opcional) durante o commit do ponto de verificação.
customMetrics.rocksdbCommitFileSyncLatencyMs O tempo em milissegundos para a sincronização do instantâneo do RocksDB nativo com o armazenamento externo (o local do ponto de verificação).
customMetrics.rocksdbCommitFlushLatency O tempo em milissegundos que leva para liberar as alterações in-memory do RocksDB no disco local.
customMetrics.rocksdbCommitPauseLatency O tempo em milissegundos para interromper os threads de trabalho em segundo plano como parte do commit do ponto de verificação, por exemplo, para compactação.
customMetrics.rocksdbCommitWriteBatchLatency O tempo em milissegundos para a aplicação das gravações em estágios na estrutura in-memory (WriteBatch) ao RocksDB nativo.
customMetrics.rocksdbFilesCopied O número de arquivos copiados, conforme rastreado pelo gerenciador de arquivos do RocksDB.
customMetrics.rocksdbFilesReused O número de arquivos reutilizados, conforme rastreado pelo gerenciador de arquivos do RocksDB.
customMetrics.rocksdbGetCount O número de chamadas get para o banco de dados (não inclui gets de WriteBatch: lote in-memory usado para gravações de preparo).
customMetrics.rocksdbGetLatency O tempo médio em nanossegundos para a chamada RocksDB::Get nativa subjacente.
customMetrics.rocksdbReadBlockCacheHitCount A contagem de ocorrências no cache de blocos do RocksDB que são úteis para evitar leituras de disco local.
customMetrics.rocksdbReadBlockCacheMissCount A contagem do cache de blocos no RocksDB não é útil para evitar leituras de disco local.
customMetrics.rocksdbSstFileSize O tamanho de todo o arquivo de SST (tabela classificada estática): a estrutura tabular que o RocksDB usa para armazenar dados.
customMetrics.rocksdbTotalBytesRead O número de bytes não compactados lidos pelas operações get.
customMetrics.rocksdbTotalBytesReadByCompaction O número de bytes que o processo de compactação lê no disco.
customMetrics.rocksdbTotalBytesReadThroughIterator O número total de bytes de dados não compactados lidos com um iterador. Algumas operações com estado (por exemplo, processamento de tempo limite em FlatMapGroupsWithState e marca d'água) exigem a leitura de dados no banco de dados por meio de um iterador.
customMetrics.rocksdbTotalBytesWritten O número total de bytes não compactados gravados por operações put.
customMetrics.rocksdbTotalBytesWrittenByCompaction O número total de bytes que o processo de compactação grava no disco.
customMetrics.rocksdbTotalCompactionLatencyMs O tempo em milissegundos para compactações do RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante o commit.
customMetrics.rocksdbTotalFlushLatencyMs O tempo total de liberação, incluindo a liberação em segundo plano. As operações de liberação são processos que liberam o MemTable no armazenamento quando ele está cheio. MemTables são o primeiro nível em que os dados são armazenados no RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed O tamanho em bytes dos arquivos ZIP descompactados, conforme relatado pelo gerenciador de arquivos. O gerenciador de arquivos gerencia a utilização e a exclusão do espaço em disco do arquivo SST físico.

objeto de origem (Kafka)

Métrica Descrição
sources.description Uma descrição detalhada da fonte do Kafka, especificando o tópico exato do Kafka que está sendo lido. Por exemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Objeto sources.startOffset O número de deslocamento inicial no tópico do Kafka em que o trabalho de streaming começou.
Objeto sources.endOffset O último deslocamento processado pelo microlote. Isso pode ser igual a latestOffset para uma execução do microlote em andamento.
Objeto sources.latestOffset O deslocamento mais recente calculado pelo microlote. O processo de microlote pode não processar todos os deslocamentos quando há limitação, o que resulta em diferenças entre endOffset e latestOffset.
sources.numInputRows O número de linhas de entrada processadas nesta fonte.
sources.inputRowsPerSecond A taxa na qual os dados chegam para processamento nesta fonte.
sources.processedRowsPerSecond A taxa na qual o Spark está processando dados desta fonte.

Objeto sources.metrics (Kafka)

Métrica Descrição
sources.metrics.avgOffsetsBehindLatest O número médio de deslocamentos em atraso na consulta de streaming com relação ao último deslocamento disponível entre todos os tópicos assinados.
sources.metrics.estimatedTotalBytesBehindLatest O número estimado de bytes que o processo de consulta não consumiu dos tópicos assinados.
sources.metrics.maxOffsetsBehindLatest O número máximo de deslocamentos em atraso na consulta de streaming com relação ao último deslocamento disponível entre todos os tópicos assinados.
sources.metrics.minOffsetsBehindLatest O número mínimo de deslocamentos em atraso na consulta de streaming com relação ao último deslocamento disponível entre todos os tópicos assinados.

objeto coletor (Kafka)

Métrica Descrição
sink.description A descrição do coletor do Kafka em que a consulta de streaming está realizando gravações, detalhando a implementação específica do coletor do Kafka que está sendo usada. Por exemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows O número de linhas que foram gravadas na tabela de saída ou no coletor como parte do microlote. Em algumas situações, esse valor pode ser "-1" e, em geral, pode ser interpretado como "desconhecido".

objeto de origem (Delta Lake)

Métrica Descrição
sources.description A descrição da fonte da qual a consulta de streaming está sendo lida. Por exemplo: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion A versão de serialização com a qual esse deslocamento é codificado.
sources.[startOffset/endOffset].reservoirId A ID da tabela que está sendo lida. Isso é utilizado para detectar a configuração incorreta ao reiniciar uma consulta.
sources.[startOffset/endOffset].reservoirVersion A versão da tabela que está sendo processada no momento.
sources.[startOffset/endOffset].index O índice na sequência de AddFiles nesta versão. É utilizado para interromper grandes confirmações em vários lotes. Esse índice é criado pela classificação em modificationTimestamp e path.
sources.[startOffset/endOffset].isStartingVersion Identifica se o deslocamento atual marca o início de uma nova consulta de streaming, em vez do processamento de alterações que ocorreram depois que os dados iniciais foram processados. Ao iniciar uma nova consulta, todos os dados presentes inicialmente na tabela são processados ​​primeiro e, depois, quaisquer novos dados que chegarem.
sources.latestOffset O deslocamento mais recente processado pela consulta do microlote.
sources.numInputRows O número de linhas de entrada processadas nesta fonte.
sources.inputRowsPerSecond A taxa na qual os dados chegam para processamento nesta fonte.
sources.processedRowsPerSecond A taxa na qual o Spark está processando dados desta fonte.
sources.metrics.numBytesOutstanding O tamanho combinado dos arquivos pendentes (arquivos rastreados pelo RocksDB). Essa é a métrica de lista de pendências para a Delta e o Carregador Automático como fonte de streaming.
sources.metrics.numFilesOutstanding O número de arquivos pendentes a serem processados. Essa é a métrica de lista de pendências para a Delta e o Carregador Automático como fonte de streaming.

objeto de coletor (Delta Lake)

Métrica Descrição
sink.description A descrição do coletor Delta, detalhando a implementação específica do coletor Delta que está sendo usada. Por exemplo: “DeltaSink[table]”.
sink.numOutputRows O número de linhas é sempre “-1” porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para o coletor Delta Lake.

Exemplos

Exemplo de evento StreamingQueryListener do Kafka para Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemplo de evento StreamingQueryListener do Delta Lake-to-Delta Lake

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exemplo de evento StreamingQueryListener do Kinesis para o Delta Lake

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Exemplo de evento StreamingQueryListener do Kafka e do Delta Lake para Delta Lake

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Fonte de taxa de exemplo para o evento StreamingQueryListener do Delta Lake

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}