Trabalhar com o histórico de tabelas do Delta Lake
Cada operação que modifica uma tabela do Delta Lake cria uma nova versão da tabela. Você pode usar informações de histórico para auditar operações ou consultar uma tabela em um ponto específico no tempo.
Observação
O Databricks não recomenda usar o histórico de tabelas do Delta Lake como uma solução de backup de longo prazo para arquivamento de dados. O Databricks recomenda usar apenas os últimos sete dias para operações de viagem no tempo, a menos que você tenha definido as configurações de retenção de dados e logs como um valor maior.
Recuperar o histórico da tabela Delta
Você pode recuperar informações de operações, usuário, carimbo de data/hora e assim por diante relativas a cada gravação em uma tabela do Delta por meio do comando history
. As operações são retornadas em ordem cronológica inversa.
A retenção do histórico de tabelas é determinada pela configuração da tabela delta.logRetentionDuration
, que é de 30 dias por padrão.
Observação
A viagem no tempo e o histórico de tabelas são controlados por diferentes limites de retenção. Consulte O que é a viagem no tempo do Delta Lake?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Para obter detalhes de sintaxe do Spark SQL, confira DESCRIBE HISTORY.
Confira a documentação da API do Delta Lake para obter detalhes da sintaxe de Scala/Java/Python.
O Explorador de Catálogos fornece uma exibição visual dessas informações detalhadas da tabela e do histórico de tabelas Delta. Além do esquema de tabela e dos dados de exemplo, você pode clicar na guia Histórico para ver o histórico da tabela que é exibido com DESCRIBE HISTORY
.
Esquema de histórico
A saída da operação history
tem as colunas a seguir.
Coluna | Type | Descrição |
---|---|---|
version | long | Versão da tabela gerada pela operação. |
timestamp | timestamp | Quando esta versão foi confirmada. |
userId | string | ID do usuário que executou a operação. |
userName | string | Nome do usuário que executou a operação. |
operação | string | Nome da operação. |
operationParameters | map | Parâmetros da operação (por exemplo, predicados). |
trabalho | struct | Detalhes do trabalho que executou a operação. |
notebook | struct | Detalhes do notebook do qual a operação foi executada. |
clusterId | string | ID do cluster em que a operação foi executada. |
readVersion | long | Versão da tabela lida para executar a operação de gravação. |
isolationLevel | string | Nível de isolamento usado nesta operação. |
isBlindAppend | booleano | Se essa operação acrescenta dados. |
operationMetrics | map | Métricas da operação (por exemplo, número de linhas e arquivos modificados). |
userMetadata | string | Metadados de commit definidos pelo usuário se foram especificados |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Observação
- Quando você usa os métodos abaixo para gravar dados em uma tabela, algumas das outras colunas não ficam disponíveis:
- Colunas incluídas mais tarde sempre são adicionadas após a última coluna.
Chaves de métricas da operação
A operação history
retorna uma coleção de métricas de operações no mapa de colunas operationMetrics
.
As tabelas a seguir listam as definições essenciais do mapa por operação.
Operação | Nome da métrica | Descrição |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Número de arquivos gravados. | |
numOutputBytes | Tamanho em bytes do conteúdo gravado. | |
numOutputRows | Número de linhas gravadas. | |
ATUALIZAÇÃO DE STREAMING | ||
numAddedFiles | Número de arquivos adicionados. | |
numRemovedFiles | Número de arquivos removidos. | |
numOutputRows | Número de linhas gravadas. | |
numOutputBytes | Tamanho da gravação em bytes. | |
Delete (excluir) | ||
numAddedFiles | Número de arquivos adicionados. Não fornecido quando as partições da tabela são excluídas. | |
numRemovedFiles | Número de arquivos removidos. | |
numDeletedRows | Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas. | |
numCopiedRows | Número de linhas copiadas no processo de exclusão de arquivos. | |
executionTimeMs | Tempo para executar toda a operação. | |
scanTimeMs | Tempo para procurar correspondes nos arquivos. | |
rewriteTimeMs | Tempo para reescrever os arquivos correspondentes. | |
TRUNCATE | ||
numRemovedFiles | Número de arquivos removidos. | |
executionTimeMs | Tempo para executar toda a operação. | |
MESCLAR | ||
numSourceRows | Número de linhas do DataFrame de origem. | |
numTargetRowsInserted | Número de linhas inseridas na tabela de destino. | |
numTargetRowsUpdated | Número de linhas atualizadas na tabela de destino. | |
numTargetRowsDeleted | Número de linhas excluídas na tabela de destino. | |
numTargetRowsCopied | Número de linhas de destino copiadas. | |
numOutputRows | Número total de linhas gravadas. | |
numTargetFilesAdded | Número de arquivos adicionados ao sink(target). | |
numTargetFilesRemoved | Número de arquivos removidos do sink(target). | |
executionTimeMs | Tempo para executar toda a operação. | |
scanTimeMs | Tempo para procurar correspondes nos arquivos. | |
rewriteTimeMs | Tempo para reescrever os arquivos correspondentes. | |
UPDATE | ||
numAddedFiles | Número de arquivos adicionados. | |
numRemovedFiles | Número de arquivos removidos. | |
numUpdatedRows | Número de linhas atualizadas. | |
numCopiedRows | Número de linhas copiadas no processo de atualização de arquivos. | |
executionTimeMs | Tempo para executar toda a operação. | |
scanTimeMs | Tempo para procurar correspondes nos arquivos. | |
rewriteTimeMs | Tempo para reescrever os arquivos correspondentes. | |
FSCK | numRemovedFiles | Número de arquivos removidos. |
CONVERT | numConvertedFiles | Número de arquivos Parquet convertidos. |
OPTIMIZE | ||
numAddedFiles | Número de arquivos adicionados. | |
numRemovedFiles | Número de arquivos otimizados. | |
numAddedBytes | Número de bytes adicionados depois que a tabela foi otimizada. | |
numRemovedBytes | Número de bytes removidos. | |
minFileSize | Tamanho do menor arquivo depois que a tabela foi otimizada. | |
p25FileSize | Tamanho do arquivo do 25º percentil depois que a tabela foi otimizada. | |
p50FileSize | Tamanho do arquivo mediano depois que a tabela foi otimizada. | |
p75FileSize | Tamanho do arquivo do 75º percentil depois que a tabela foi otimizada. | |
maxFileSize | Tamanho do maior arquivo depois que a tabela foi otimizada. | |
CLONAR | ||
sourceTableSize | Tamanho em bytes da tabela de origem na versão clonada. | |
sourceNumOfFiles | Número de arquivos da tabela de origem na versão clonada. | |
numRemovedFiles | Número de arquivos removidos da tabela de destino se uma tabela Delta anterior foi substituída. | |
removedFilesSize | Tamanho total em bytes dos arquivos removidos da tabela de destino se uma tabela do Delta anterior foi substituída. | |
numCopiedFiles | Número de arquivos que foram copiados no novo local. 0 para clones superficiais. | |
copiedFilesSize | Tamanho total em bytes dos arquivos que foram copiados no novo local. 0 para clones superficiais. | |
RESTORE | ||
tableSizeAfterRestore | Tamanho da tabela em bytes após a restauração. | |
numOfFilesAfterRestore | Número de arquivos da tabela após a restauração. | |
numRemovedFiles | Número de arquivos removidos pela operação de restauração. | |
numRestoredFiles | Número de arquivos adicionados como resultado da restauração. | |
removedFilesSize | Tamanho em bytes de arquivos removidos pela restauração. | |
restoredFilesSize | Tamanho em bytes de arquivos adicionados pela restauração. | |
VACUUM | ||
numDeletedFiles | Número de arquivos excluídos. | |
numVacuumedDirectories | Número de diretórios aspirados. | |
numFilesToDelete | Número de arquivos a serem excluídos. |
O que é a viagem no tempo do Delta Lake?
O Delta Lake dá suporte à consulta de versões de tabela anteriores com base no carimbo de data/hora ou na versão da tabela (conforme registrado no log de transações). Você pode usar viagens no tempo para aplicativos como o seguinte:
- Recriando análises, relatórios ou saídas (por exemplo, a saída de um modelo de aprendizado de máquina). Isso pode ser útil para depuração ou auditoria, especialmente em setores regulamentados.
- Escrevendo consultas temporais complexas.
- Corrigindo erros nos seus dados.
- Fornecimento de isolamento de instantâneos para um conjunto de consultas de tabelas que são alteradas rapidamente.
Importante
As versões de tabela acessíveis com viagem no tempo são determinadas por uma combinação do limite de retenção para arquivos de log de transações e a frequência e a retenção especificada para VACUUM
operações. Se você executar VACUUM
diariamente com os valores padrão, sete dias de dados estão disponíveis para viagem no tempo.
Sintaxe de viagem no tempo do Delta
Você consulta uma tabela do Delta com viagem no tempo adicionando uma cláusula após a especificação do nome da tabela.
timestamp_expression
pode ser qualquer uma das seguintes opções:'2018-10-18T22:15:12.013Z'
, ou seja, uma cadeia de caracteres que pode ser convertida em um carimbo de data/horacast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, ou seja, uma cadeia de caracteres de datacurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
version
é um valor longo que pode ser obtido da saída deDESCRIBE HISTORY table_spec
.
timestamp_expression
e version
não podem ser subconsultas.
Somente cadeias de caracteres de carimbo de data/hora ou data são aceitas. Por exemplo, "2019-01-01"
e "2019-01-01T00:00:00.000Z"
. Confira o código a seguir para obter uma sintaxe de exemplo:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Você também pode usar a sintaxe @
para especificar o carimbo de data/hora ou a versão como parte do nome da tabela. O carimbo de data/hora precisa estar no formato yyyyMMddHHmmssSSS
. Você pode especificar uma versão após @
, incluindo v
no início da versão. Confira o código a seguir para obter uma sintaxe de exemplo:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
O que são pontos de verificação de log de transações?
O Delta Lake registra versões de tabela como arquivos JSON no diretório _delta_log
, que é armazenado juntamente com os dados da tabela. Para otimizar a consulta de ponto de verificação, o Delta Lake agrega versões de tabela aos arquivos de ponto de verificação Parquet, eliminando a necessidade de ler todas as versões JSON do histórico de tabelas. O Azure Databricks otimiza a frequência de ponto de verificação para o tamanho dos dados e a carga de trabalho. Os usuários não devem interagir diretamente com pontos de verificação. A frequência do ponto de verificação está sujeita a alterações sem aviso prévio.
Configurar retenção de dados para consultas de viagem no tempo
Para consultar uma versão anterior da tabela, você precisa manter tanto o log quanto os arquivos de dados para essa versão.
Os arquivos de dados são excluídos quando VACUUM
é executado em uma tabela. O Delta Lake gerencia a remoção de arquivo de log automaticamente após versões de tabela de ponto de verificação.
Como a maioria das tabelas Delta têm VACUUM
executadas nelas regularmente, as consultas pontuais devem respeitar o limite de retenção para VACUUM
, que é de sete dias por padrão.
Para aumentar o limite de retenção de dados para tabelas Delta, você deve configurar as seguintes propriedades da tabela:
delta.logRetentionDuration = "interval <interval>"
: controla por quanto tempo o histórico de uma tabela é mantido. O padrão éinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: determina o limiteVACUUM
usado para remover arquivos de dados que não são mais referenciados na versão atual da tabela. O padrão éinterval 7 days
.
Você pode especificar as propriedades Delta durante a criação da tabela ou defini-las com uma instrução ALTER TABLE
. Confira Referência de propriedades da tabela Delta.
Observação
Você deve definir ambas as propriedades para garantir que o histórico da tabela seja retido por mais tempo para tabelas com operações VACUUM
frequentes. Por exemplo, para acessar 30 dias de dados históricos, defina delta.deletedFileRetentionDuration = "interval 30 days"
(que corresponde à configuração padrão para delta.logRetentionDuration
).
Aumentar o limite de retenção de dados pode fazer com que os custos de armazenamento aumentem à medida que mais arquivos de dados forem mantidos.
Restaurar um estado anterior de uma tabela Delta
Você pode restaurar uma tabela Delta para seu estado anterior com o RESTORE
comando. As tabela do Delta mantêm internamente as versões históricas que permitem a restauração para um estado anterior.
Uma versão correspondente ao estado anterior ou um carimbo de data/hora de quando o estado anterior foi criado têm suporte como opções pelo comando RESTORE
.
Importante
- Você pode restaurar uma tabela já restaurada.
- Você pode restaurar uma tabela clonada.
- Você precisa ter a permissão
MODIFY
na tabela que está sendo restaurada. - Você não pode restaurar uma tabela para uma versão mais antiga em que os arquivos de dados foram excluídos manualmente ou pelo
vacuum
. A restauração parcial para essa versão ainda é possível sespark.sql.files.ignoreMissingFiles
estiver definido comotrue
. - O formato de carimbo de data/hora para restaurar para um estado anterior é
yyyy-MM-dd HH:mm:ss
. Também há suporte para fornecer apenas uma cadeia de caracteres de data (yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Para obter detalhes da sintaxe, confira RESTORE.
Importante
A restauração é considerada uma operação de alteração de dados. As entradas de log do Delta Lake adicionadas pelo comando RESTORE
contêm dataChange definido como true. Se houver um aplicativo downstream, como um trabalho de streaming estruturado que processa as atualizações para uma tabela do Delta Lake, as entradas de log de alterações de dados adicionadas pela operação de restauração serão consideradas como novas atualizações de dados e processá-las poderá resultar em dados duplicados.
Por exemplo:
Versão da tabela | Operação | Atualizações de log delta | Registros em atualizações de log de alterações de dados |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (nome = Viktor, idade = 29 anos, (nome = George, idade = 55 anos) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (nome = George, idade = 39 anos) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Nenhum registro como Otimizar compactação não altera os dados na tabela) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (nome = Viktor, idade = 29 anos), (nome = George, idade = 55 anos), (nome = George, idade = 39 anos) |
No exemplo anterior, o comando RESTORE
resulta em atualizações que já foram vistas durante a leitura da tabela Delta versão 0 e 1. Se uma consulta de streaming fez a leitura dessa tabela, esses arquivos serão considerados como dados recém-adicionados e serão processados novamente.
Restaurar métricas
RESTORE
relata as seguintes métricas como um DataFrame de linha única quando a operação é concluída:
table_size_after_restore
: o tamanho da tabela após a restauração.num_of_files_after_restore
: o número de arquivos da tabela após a restauração.num_removed_files
: o número de arquivos removidos (logicamente excluídos) da tabela.num_restored_files
: o número de arquivos restaurados devido à reversão.removed_files_size
: o tamanho total em bytes dos arquivos removidos da tabela.restored_files_size
: o tamanho total em bytes dos arquivos restaurados.
Exemplos de como usar a viagem no tempo do Delta Lake
Corrigir exclusões acidentais em uma tabela para o usuário
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Corrigir atualizações incorretas acidentais em uma tabela:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Consultar o número de novos clientes adicionados na última semana.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Como encontrar a versão do último commit na sessão do Spark?
Para ter acesso ao número da versão do último commit gravado pelo atual SparkSession
em todos os threads e todas as tabelas, confira a configuração do SQL spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Se o SparkSession
não tiver feito nenhum commit, a consulta à chave irá retornar um valor vazio.
Observação
Compartilhar o mesmo SparkSession
com vários threads é semelhante a compartilhar uma variável com vários threads. Você pode atingir condições de corrida, pois o valor de configuração é atualizado simultaneamente.