Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks
Sie können die RocksDB-basierte Zustandsverwaltung aktivieren, indem Sie die folgende Konfiguration in der SparkSession-Instanz festlegen, bevor Sie die Streamingabfrage starten.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Sie können RocksDB für Delta Live Tables-Pipelines aktivieren. Weitere Informationen finden Sie unter Optimieren der Pipelinekonfiguration für die zustandsbehaftete Verarbeitung.
Aktivieren der Änderungsprotokollprüfpunkte
In Databricks Runtime 13.3 LTS und höher können Sie Prüfpunkte im Änderungsprotokoll aktivieren, um die Prüfpunktdauer und End-to-End-Latenz für strukturierte Streamingworkloads zu verringern. Databricks empfiehlt, Änderungsprotokollprüfpunkte für alle zustandsbehafteten Abfragen von strukturiertem Streaming zu aktivieren.
Normalerweise werden Momentaufnahmen des RocksDB-Zustandsspeichers während des Prüfpunkts erstellt und Datendateien hochgeladen. Um diese Kosten zu vermeiden, schreiben Änderungsprotokollprüfpunkte nur Datensätze, die sich seit dem letzten Prüfpunkt geändert haben, in dauerhaften Speicher."
Änderungsprotokollprüfpunkte sind standardmäßig deaktiviert. Sie können Änderungsprotokollprüfpunkte in der SparkSession-Ebene mit der folgenden Syntax aktivieren:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Sie können die Änderungsprotokollprüfpunkte für einen vorhandenen Stream aktivieren und die im Prüfpunkt gespeicherten Zustandsinformationen verwalten.
Wichtig
Abfragen mit aktivierten Prüfpunkten im Änderungsprotokoll, können nur unter Databricks Runtime 13.3 LTS und höher ausgeführt werden. Sie können Prüfpunkte im Änderungsprotokoll deaktivieren, um das Legacyverhalten von Prüfpunkten wiederherzustellen. Sie müssen diese Abfragen jedoch trotzdem in Databricks Runtime 13.3 LTS oder höher ausführen. Sie müssen den Auftrag neu starten, damit diese Änderungen vorgenommen werden.
Metriken des RocksDB-Zustandsspeichers
Jeder Zustandsoperator erfasst Metriken im Zusammenhang mit den Zustandsverwaltungsvorgängen, die auf seiner RocksDB-Instanz ausgeführt werden, um den Zustandsspeicher zu beobachten und möglicherweise bei der Behebung einer langsamen Auftragsausführung zu helfen. Diese Metriken werden pro Zustandsoperator im Auftrag für alle Aufgaben aggregiert (Summe), bei denen der Zustandsoperator ausgeführt wird. Diese Metriken sind Teil der customMetrics
-Zuordnung in den stateOperators
-Feldern in StreamingQueryProgress
. Nachfolgend sehen Sie ein Beispiel für StreamingQueryProgress
im JSON-Format (mit StreamingQueryProgress.json()
abgerufen).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Ausführliche Beschreibungen der Metriken:
Metrikname | BESCHREIBUNG |
---|---|
rocksdbCommitWriteBatchLatency | Zeit (in Millisekunden) zum Anwenden der Stagingschreibvorgänge in der In-Memory-Struktur (WriteBatch) auf die native RocksDB-Instanz. |
rocksdbCommitFlushLatency | Zeit (in Millisekunden) für das Leeren der In-Memory-Änderungen in RocksDB auf den lokalen Datenträger. |
rocksdbCommitCompactLatency | Zeit (in Millisekunden) für die Komprimierung (optional) während des Prüfpunktcommits. |
rocksdbCommitPauseLatency | Zeit (in Millisekunden) zum Beenden der Arbeitsthreads im Hintergrund (für Komprimierung usw.) als Teil des Prüfpunkt-Commits. |
rocksdbCommitCheckpointLatency | Zeit (in Millisekunden) für das Erstellen einer Momentaufnahme der nativen RocksDB-Instanz und das Schreiben in ein lokales Verzeichnis. |
rocksdbCommitFileSyncLatencyMs | Zeit (in Millisekunden) für die Synchronisierung der Dateien für die Momentaufnahme der nativen RocksDB-Instanz mit einem externen Speicher (Prüfpunktspeicherort). |
rocksdbGetLatency | Durchschnittliche Zeit (in Nanosekunden) pro zugrunde liegendem nativen RocksDB::Get -Aufruf. |
rocksdbPutCount | Durchschnittliche Zeit (in Nanosekunden) pro zugrunde liegendem nativen RocksDB::Put -Aufruf. |
rocksdbGetCount | Anzahl nativer RocksDB::Get -Aufrufe (ohne Gets aus WriteBatch – In-Memory-Batch für Stagingschreibvorgänge verwendet). |
rocksdbPutCount | Anzahl nativer RocksDB::Put -Aufrufe (ohne Puts in WriteBatch – In-Memory-Batch für Stagingschreibvorgänge verwendet). |
rocksdbTotalBytesReadByGet | Anzahl unkomprimierter Bytes, die durch native RocksDB::Get -Aufrufe gelesen wurden. |
rocksdbTotalBytesWrittenByPut | Anzahl unkomprimierter Bytes, die durch native RocksDB::Put -Aufrufe geschrieben wurden. |
rocksdbReadBlockCacheHitCount | Anzahl, wie oft der native RocksDB-Blockcache verwendet wird, um das Lesen von Daten vom lokalen Datenträger zu vermeiden. |
rocksdbReadBlockCacheMissCount | Anzahl, wie oft der native RocksDB-Blockcache nicht funktioniert hat und Daten vom lokalen Datenträger gelesen werden mussten. |
rocksdbTotalBytesReadByCompaction | Anzahl der Bytes, die durch den nativen RocksDB-Komprimierungsprozess vom lokalen Datenträger gelesen wurden. |
rocksdbTotalBytesWrittenByCompaction | Anzahl der Bytes, die durch den nativen RocksDB-Komprimierungsprozess auf den lokalen Datenträger geschrieben wurden. |
rocksdbTotalCompactionLatencyMs | Zeit (in Millisekunden) für RocksDB-Komprimierungen (sowohl im Hintergrund als auch optionale Komprimierung, die während des Commits initiiert wurde). |
rocksdbWriterStallLatencyMs | Zeit (in Millisekunden), über die der Writer aufgrund einer Komprimierung im Hintergrund oder zum Leeren der Memtables auf den Datenträger angehalten hat. |
rocksdbTotalBytesReadThroughIterator | Einige zustandsbehaftete Vorgänge (z. B. Timeoutverarbeitung in flatMapGroupsWithState oder Wasserzeichen für Aggregationen im Fenstermodus) erfordern das Lesen der gesamten Daten in der Datenbank über Iterator. Die Gesamtgröße der mit dem Iterator gelesenen unkomprimierten Daten. |