DFS (script de fluxo de dados)
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Dica
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!
Os fluxos de dados estão disponíveis nos pipelines do Azure Data Factory e do Azure Synapse. Este artigo se aplica ao fluxo de dados de mapeamento. Se você for iniciante nas transformações, veja o artigo introdutório Transformar dados usando um fluxo de dados de mapeamento.
O DFS (script de fluxo de dados) consiste nos metadados subjacentes, semelhante a uma linguagem de codificação, que são usados para executar as transformações incluídas em um fluxo de dados de mapeamento. Cada transformação é representada por uma série de propriedades que fornecem as informações necessárias para executar o trabalho corretamente. O script é visível e editável do ADF clicando no botão "script" na faixa de opções superior da interface do usuário do navegador.
Por exemplo, allowSchemaDrift: true,
em uma transformação de origem instruirá o serviço a incluir todas as colunas do conjunto de dados de origem no fluxo de dados, mesmo se elas não estiverem incluídas na projeção do esquema.
Casos de uso
O DFS é produzido automaticamente pela interface do usuário. Você pode clicar no botão Script para ver e personalizar o script. Você também pode gerar scripts fora da interface do usuário do ADF e passá-los para o cmdlet do PowerShell. Ao depurar fluxos de dados complexos, você pode achar mais fácil verificar o code-behind do script em vez de verificar o exame do grafo da interface do usuário de seus fluxos.
Estes são alguns casos de uso de exemplo:
- Produção programática de muitos fluxos de dados que são bastante semelhantes, ou seja, fluxos de dados de "carimbo".
- Expressões complexas que são difíceis de gerenciar na interface do usuário ou que resultam em problemas de validação.
- Depuração e melhor compreensão de vários erros retornados durante a execução.
Quando você cria um script de fluxo de dados para usar com o PowerShell ou uma API, você precisa recolher o texto formatado em uma linha. Você pode manter guias e novas linhas como caracteres de escape. Mas o texto precisa ser formatado para se ajustar dentro de uma propriedade JSON. Há um botão na interface do usuário do editor de script, na parte inferior, que formatará o script como uma única linha para você.
Como adicionar transformações
A adição de transformações requer três etapas básicas: adicionar os dados de transformação de núcleo, redirecionar o fluxo de entrada e rotear novamente o fluxo de saída. Isso pode ser visto mais facilmente em um exemplo. Digamos que vamos começar com uma fonte simples para coletar o fluxo de dados como o seguinte:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Se decidirmos adicionar uma transformação Derive, primeiro precisaremos criar o texto da transformação principal, que tem uma expressão simples para adicionar uma nova coluna em maiúsculas chamada upperCaseTitle
:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Em seguida, pegamos o DFS existente e adicionamos a transformação:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
E agora redirecionamos o fluxo de entrada identificando a transformação após a qual queremos que a nova transformação venha (neste caso, source1
) e copiando o nome do fluxo para a nova transformação:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Por fim, identificamos a transformação que desejamos que venha após essa nova transformação e substituímos o fluxo de entrada dela (neste caso, sink1
) pelo nome do fluxo de saída da nossa nova transformação:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Conceitos básicos do DFS
O DFS é composto por uma série de transformações conectadas, incluindo fontes, coletores e vários outros que podem adicionar novas colunas, filtrar dados, unir dados e muito mais. Normalmente, o script começará com uma ou mais fontes seguidas por muitas transformações e que terminam com um ou mais coletores.
Todas as fontes têm a mesma construção básica:
source(
source properties
) ~> source_name
Por exemplo, uma fonte simples com três colunas (movieId, title, genres) seria:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
Todas as transformações diferentes de fontes têm a mesma construção básica:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Por exemplo, uma transformação Derive simples que usa uma coluna (title) e a substitui por uma versão em maiúsculas seria a seguinte:
source1 derive(
title = upper(title)
) ~> derive1
Um coletor sem esquema seria semelhante ao seguinte:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Snippets de script
Os snippets de script são código compartilhável do Script de Fluxo de Dados que você pode usar para fazer o compartilhamento entre fluxos de dados. Este vídeo abaixo explica como usar snippets de script e utilizar o Script de Fluxo de Dados para copiar e colar partes do script por trás de seus grafos de fluxo de dados:
Estatísticas de resumo agregadas
Adicione uma transformação Aggregate ao seu fluxo de dados chamada "SummaryStats" e cole esse código abaixo para a função de agregação em seu script, substituindo o SummaryStats existente. Isso fornecerá um padrão genérico para estatísticas de resumo do perfil de dados.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
Você também pode usar o exemplo abaixo para contar o número de linhas únicas e o número de linhas diferentes em seus dados. O exemplo a seguir pode ser colado em um fluxo de dados com transformação Aggregate chamada ValueDistAgg. Este exemplo usa uma coluna chamada "title". Substitua "title" pela coluna de cadeia de caracteres em seus dados que você deseja usar para obter contagens de valor.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Incluir todas as colunas em uma agregação
Esse é um padrão de agregação genérico que demonstra como manter as colunas restantes em seus metadados de saída ao criar agregações. Nesse caso, usamos a função first()
para escolher o primeiro valor em cada coluna cujo nome não seja "filme". Para usar isso, crie uma transformação Aggregate chamada DistinctRows e cole-a em seu script sobre o script de agregação DistinctRows existente.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Criar impressão digital de hash de linha
Use esse código em seu script de fluxo de dados para criar uma coluna derivada chamada DWhash
que produz um hash sha1
de três colunas.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
Você também pode usar o script abaixo para gerar um hash de linha que usa todas as colunas presentes em seu fluxo, sem a necessidade de nomear cada uma delas:
derive(DWhash = sha1(columns())) ~> DWHash
Equivalente string_agg
Esse código funcionará como a função string_agg()
T-SQL e agregará valores de cadeia de caracteres em uma matriz. Em seguida, você pode converter essa matriz em uma cadeia de caracteres para usá-la com destinos SQL.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Número de contagem de atualizações, upserts, inserções, exclusões
Quando você usa uma transformação Alter Row, talvez seja interessante contar o número de atualizações, upserts, inserções e exclusões que resultam de suas políticas de Alter Row. Adicione uma transformação Aggregate após a alteração de linha e cole esse Script de Fluxo de Dados na definição de agregação para essas contagens.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Linha distinta usando todas as colunas
Esse snippet adicionará uma nova transformação Aggregate ao fluxo de dados que usará todas as colunas de entrada, gerará um hash a ser usado para o agrupamento eliminar duplicatas e fornecerá a primeira ocorrência de cada duplicata como uma saída. Você não precisa nomear explicitamente as colunas; elas serão geradas automaticamente com base no fluxo de dados de entrada.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
Verificar se há NULLs em todas as colunas
Este é um snippet que você pode colar em seu fluxo de dados para verificar genericamente todas as suas colunas em busca de valores NULL. Essa técnica aproveita o descompasso de esquema para examinar todas as colunas em todas as linhas e usa uma Divisão Condicional para separar as linhas com NULLs das linhas sem NULLs.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
Descompasso de esquema AutoMap com uma seleção
Quando for necessário carregar um esquema de banco de dados existente de um conjunto de colunas de entrada desconhecido ou dinâmico, você precisará mapear as colunas do lado direito na transformação Sink. Isso só será necessário quando você estiver carregando uma tabela existente. Adicione este snippet antes do Sink para criar um Select que mapeia automaticamente suas colunas. Deixe o mapeamento do Sink mapear automaticamente.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Manter tipos de dados de coluna
Adicione esse script dentro de uma definição de coluna derivada para armazenar os nomes de coluna e os tipos de dados do fluxo de dados em um repositório persistente usando um coletor.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Preencher
Veja como implementar o problema comum de "preenchimento" com conjuntos de dados para substituir valores NULL por valores não NULL na sequência. Observe que essa operação pode ter implicações de desempenho negativas, pois você precisa criar uma janela sintética em todo o conjunto de dados com um valor de categoria fictício. Além disso, você precisa fazer a classificação por um valor para criar a sequência de dados apropriada para localizar o valor não NULL anterior. O snippet de código abaixo cria a categoria sintética como fictícia e faz a classificação por uma chave alternativa. Você pode remover a chave alternativa e usar a própria chave de classificação específica de dados. Este snippet de código pressupõe que você já tenha adicionado uma transformação Source chamada source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Média móvel
A média móvel pode ser implementada com muita facilidade em fluxos de dados usando uma transformação do Windows. O exemplo abaixo cria uma média móvel de 15 dias de preços de ações para a Microsoft.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Contagem distinta de todos os valores da coluna
Você pode usar esse script para identificar as colunas de chave e exibir a cardinalidade de todas as colunas em seu fluxo com um único snippet de script. Adicione esse script como uma transformação de agregação ao fluxo de dados e ele fornecerá automaticamente contagens distintas de todas as colunas.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Comparar valores das linhas anteriores ou próximas linhas
Este snippet de exemplo demonstra como a transformação da Janela pode ser usada para comparar valores de coluna no contexto da linha atual com valores de coluna de linhas que estão antes e depois da linha atual. Neste exemplo, uma Coluna Derivada é usada para gerar um valor fictício para habilitar uma partição de janela em todo o conjunto de dados. Uma transformação de Chave Alternativa é usada para atribuir um valor de chave exclusivo para cada linha. Ao aplicar esse padrão às transformações de dados, é possível remover a chave alternativa a fim de realizar a ordenação de acordo com uma coluna e remover a coluna derivada em caso de colunas a serem usadas como base para particionar os dados.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Quantas colunas dá em meus dados?
size(array(columns()))
Conteúdo relacionado
Explore os fluxos de dados começando com o artigo de visão geral sobre fluxos de dados