Referência da linguagem SQL das Tabelas Dinâmicas Delta
Este artigo fornece detalhes da interface de programação SQL do Delta Live Tables.
- Para obter informações sobre a API do Python, confira a Referência da linguagem Python das Tabelas Dinâmicas Delta.
- Para obter mais informações sobre os comandos SQL, consulte Referência de linguagem SQL.
É possível usar as funções definidas pelo usuário (UDFs) do Python em suas consultas SQL, mas é necessário definir essas UDFs nos arquivos Python antes de chamá-las nos arquivos de origem SQL. Confira Funções escalares definidas pelo usuário – Python.
Limitações
Não há suporte para a cláusula PIVOT
. A operação pivot
no Spark requer o carregamento adiantado de dados de entrada para calcular o esquema da saída. Não há suporte para essa funcionalidade no Delta Live Tables.
Criar uma exibição materializada ou tabela de streaming do Delta Live Tables
Observação
- A sintaxe
CREATE OR REFRESH LIVE TABLE
para criar uma exibição materializada foi preterida. Em vez disso, useCREATE OR REFRESH MATERIALIZED VIEW
. - Para usar a cláusula
CLUSTER BY
ao habilitar o clustering líquido, o pipeline deve ser configurado para usar o canal de preview.
Use a mesma sintaxe SQL básica ao declarar uma tabela de streaming ou uma exibição materializada.
Declarar uma exibição materializada do Delta Live Tables com SQL
O seguinte descreve a sintaxe para declarar uma exibição materializada no Delta Live Tables com SQL:
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Declarar uma tabela de streaming do Delta Live Tables com SQL
Você só pode declarar tabelas de streaming usando consultas que leem em uma fonte de streaming. O Databricks recomenda o uso do Carregador Automático para a ingestão de streaming de arquivos do armazenamento de objetos na nuvem. Confira Sintaxe SQL do Carregador Automático.
Ao especificar outras tabelas e exibições no pipeline como fontes de streaming, você deve incluir a função STREAM()
em torno do nome de um conjunto de dados.
A seguir, descreve-se a sintaxe para declarar uma tabela de streaming no Delta Live Tables com SQL:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Criar um modo de exibição do Delta Live Tables
A seguinte descrição aborda a sintaxe para declarar exibições com SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Sintaxe SQL do Carregador Automático
Veja a seguinte descrição da sintaxe para trabalhar com o Carregador Automático no SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
É possível usar as opções de formatos compatíveis com o Carregador automático. Usando a função map()
, você pode passar opções para o método read_files()
. As opções são pares chave-valor, em que as chaves e os valores são cadeias de caracteres. Para obter detalhes sobre formatos e opções de suporte, confira Opções de formato de arquivo.
Exemplo: definir tabelas
Você pode criar um conjunto de dados fazendo a leitura de uma fonte de dados externa ou de conjuntos de dados definidos em um pipeline. Para fazer uma leitura de um conjunto de dados interno, inclua a palavra-chave LIVE
no início do nome do conjunto de dados. O seguinte exemplo define dois conjuntos de dados diferentes: uma exibição chamada taxi_raw
que usa um arquivo JSON como a fonte de entrada e uma tabela chamada filtered_data
que usa a tabela taxi_raw
como entrada:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Exemplo: ler de uma fonte de streaming
Para fazer a leitura de uma fonte de streaming, como, por exemplo, o Carregador Automático ou um conjunto de dados interno, defina uma tabela de STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
Para obter mais informações sobre dados de streaming, confira Transformar dados com o Delta Live Tables.
Controlar como as tabelas são materializadas
As tabelas também oferecem controle adicional da materialização:
- Especifique como as tabelas são particionadas usando
PARTITIONED BY
. Você pode usar o particionamento para acelerar as consultas. - Você pode definir as propriedades da tabela usando
TBLPROPERTIES
. Confira Propriedades da tabela do Delta Live Tables. - Defina um local de armazenamento usando a configuração
LOCATION
. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline, casoLOCATION
não esteja definido. - Você pode usar colunas geradas em sua definição de esquema. Confira Exemplo: especificar um esquema e colunas de partição.
Observação
Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização de dados. A menos que espere que sua tabela cresça além de um terabyte, o Databricks recomenda não especificar colunas de particionamento.
Exemplo: especificar um esquema e colunas de partição
Opcionalmente, você pode especificar um esquema ao definir uma tabela. O exemplo a seguir especifica o esquema da tabela de destino, incluindo o uso de colunas geradas pelo Delta Lake e a definição de colunas de partição para a tabela:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Por padrão, Delta Live Tables inferem o esquema da definição table
, se você não especificar um esquema.
Exemplo: definir as restrições de tabela
Observação
O suporte a Tabelas Dinâmicas Delta para restrições de tabela está na Versão Prévia Pública. Para definir as restrições de tabela, seu pipeline deve ser um pipeline habilitado para Catálogo do Unity e configurado para usar o canal preview
.
Ao especificar um esquema, você pode definir as chaves primárias e estrangeiras. As restrições são informativas e não são impostas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.
O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Parametrizar valores usados ao declarar tabelas ou exibições com SQL
Use SET
para especificar um valor de configuração em uma consulta que declare uma tabela ou exibição, incluindo configurações do Spark. Qualquer tabela ou exibição que você definir em um notebook depois que a instrução SET
terá acesso ao valor definido. Todas as configurações do Spark especificadas por meio da instrução SET
são usadas ao executar a consulta do Spark para qualquer tabela ou exibição após a instrução SET. Para ler um valor de configuração em uma consulta, use a sintaxe de interpolação de cadeia de caracteres ${}
. O seguinte exemplo define um valor de configuração do Spark chamado startDate
e usa esse valor em uma consulta:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Para especificar vários valores de configuração, use uma instrução SET
separada para cada valor.
Exemplo: definir um filtro de linha e uma máscara de coluna
Importante
Os filtros de linha e as máscaras de coluna estão em Visualização Pública.
Para criar uma exibição materializada ou uma tabela de streaming com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O seguinte exemplo demonstra como definir uma exibição materializada e uma tabela de streaming com um filtro de linha e uma máscara de coluna:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.
Propriedades do SQL
Observação
Para usar a cláusula CLUSTER BY
ao habilitar o clustering líquido, o pipeline deve ser configurado para usar o canal de preview.
CREATE TABLE ou VIEW |
---|
TEMPORARY Criar uma tabela, mas não publicar os metadados da tabela. A cláusula TEMPORARY instrui o Delta Live Tables a criar uma tabela que esteja disponível para o pipeline, mas que não deva ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária permanece por toda a duração do pipeline que a cria e não apenas para uma única atualização. |
STREAMING Cria uma tabela que lê um conjunto de dados de entrada como fluxo. O conjunto de dados de entrada precisa ser uma fonte de fluxo de dados como, por exemplo, um Carregador Automático ou uma tabela de STREAMING . |
CLUSTER BY Habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering. Confira Usar clustering líquido para tabelas Delta. |
PARTITIONED BY Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela. |
LOCATION Um local de armazenamento opcional para os dados da tabela. Se ele não estiver definido, o sistema usará como padrão o local de armazenamento do pipeline. |
COMMENT Uma descrição opcional para a tabela. |
column_constraint Uma chave primária informativa opcional ou restrição da chave estrangeira na coluna. |
MASK clause (Versão prévia pública)Adiciona uma função de máscara de coluna para anonimizar dados confidenciais. Consultas futuras para essa coluna retornam o resultado da função avaliada em vez do valor original da coluna. Isso é útil para o controle de acesso refinado, pois a função pode verificar a identidade do usuário e as associações de grupo para decidir se deseja rasurar o valor. Confira Cláusula de máscara de coluna. |
table_constraint Uma chave primária informativa opcional ou restrição da chave estrangeira na tabela. |
TBLPROPERTIES Uma lista opcional de propriedades da tabela para a tabela. |
WITH ROW FILTER clause (Versão prévia pública)Adiciona uma função de filtro de linha à tabela. Consultas futuras para essa tabela recebem um subconjunto das linhas para as quais a função é avaliada como TRUE. Isso pode ser útil para fins de controle de acesso refinado, em que a função pode inspecionar a identidade e/ou as associações de grupo do usuário que a invocou para decidir se deseja filtrar algumas linhas. Confira Cláusula ROW FILTER. |
select_statement Uma consulta das Tabelas Dinâmicas Delta que define o conjunto de dados para a tabela. |
Cláusula CONSTRAINT |
---|
EXPECT expectation_name Define a restrição expectation_name de qualidade de dados. Se a restrição ON VIOLATION não estiver definida, adicione linhas que violam a restrição ao conjunto de dados de destino. |
ON VIOLATION Ação opcional a ser executada para as linhas com falha: - FAIL UPDATE : interrompe imediatamente a execução de pipeline.- DROP ROW : remove o registro e continua o processamento. |
Captura de dados de alterações com SQL no Delta Live Tables
Use a instrução APPLY CHANGES INTO
para usar a funcionalidade CDC do Delta Live Tables, conforme descrito no seguinte:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Você define as restrições de qualidade dos dados para um APPLY CHANGES
de destino usando a mesma cláusula CONSTRAINT
que as consultas não APPLY CHANGES
. Confira Gerenciar a qualidade dos dados com o Delta Live Tables.
Observação
O comportamento padrão para eventos INSERT
eUPDATE
é upsert eventos de CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam às chaves especificadas ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de eventos DELETE
pode ser especificada com a condição APPLY AS DELETE WHEN
.
Importante
Você deve declarar uma tabela de streaming de destino para aplicar alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela APPLY CHANGES
de destino, você também deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados que o campo sequence_by
.
Confira a API APPLY CHANGES: Simplifique a captura de dados de alteração nas tabelas Delta Live.
Cláusulas |
---|
KEYS A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino. Para definir uma combinação de colunas, use uma lista de colunas separadas por vírgulas. A cláusula é necessária. |
IGNORE NULL UPDATES Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e IGNORAR ATUALIZAÇÕES NULAS é especificado, as colunas com um null irão reter seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null .Esta cláusula é opcional. O padrão é substituir as colunas existentes por valores null . |
APPLY AS DELETE WHEN Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um Upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma marca para exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção pode ser configurado com opipelines.cdc.tombstoneGCThresholdInSeconds propriedade de tabela.Esta cláusula é opcional. |
APPLY AS TRUNCATE WHEN Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATE completa. Como essa cláusula dispara um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.A cláusula APPLY AS TRUNCATE WHEN tem suporte apenas para SCD tipo 1. O SCD do tipo 2 não oferece suporte a operações de truncamento.Esta cláusula é opcional. |
SEQUENCE BY O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. As tabelas Delta ao vivo usam esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem. A coluna especificada deve ser um tipo de dados classificável. A cláusula é necessária. |
COLUMNS Especifica um subconjunto de colunas a ser incluído na tabela de destino. Você pode: - Especificar a lista completa de colunas a serem incluídas: COLUMNS (userId, name, city) .- Especificar uma lista de colunas a serem excluídas: COLUMNS * EXCEPT (operation, sequenceNum) Esta cláusula é opcional. O padrão é incluir todas as colunas na tabela de destino quando a cláusula COLUMNS não for especificada. |
STORED AS Se é necessário armazenar registros como SCD tipo 1 ou SCD tipo 2. Esta cláusula é opcional. O padrão é SCD tipo 1. |
TRACK HISTORY ON Especifica um subconjunto de colunas de saída para gerar registros de histórico sempre que houver alguma alteração nessas colunas especificadas. Você pode: - Especificar a lista completa de colunas a serem rastreadas: COLUMNS (userId, name, city) .- Especificar uma lista de colunas a serem excluídas do acompanhamento: COLUMNS * EXCEPT (operation, sequenceNum) Esta cláusula é opcional. O padrão é acompanhar o histórico de todas as colunas de saída quando houver alterações, o que é equivalente a TRACK HISTORY ON * . |