Tutorial: Implementar o padrão de captura do data lake para atualizar uma tabela do Databricks Delta

Este tutorial mostra como manipular eventos em uma conta de armazenamento que tem um namespace hierárquico.

Você criará uma pequena solução que permite que um usuário preencha uma tabela do Databricks Delta carregando um arquivo CSV (de valores separados por vírgula) que descreve uma ordem de venda. Você criará essa solução conectando uma assinatura de Grade de Eventos, uma função do Azure e um trabalho no Azure Databricks.

Neste tutorial, você irá:

  • Criar uma assinatura da Grade de Eventos que chama uma função do Azure.
  • Criar uma função do Azure que recebe uma notificação de um evento e, em seguida, executa o trabalho no Azure Databricks.
  • Criar um trabalho do Databricks que insere uma ordem de cliente em uma tabela do Databricks Delta que está localizada na conta de armazenamento.

Criaremos essa solução em ordem inversa, começando com o workspace do Azure Databricks.

Pré-requisitos

Criar uma ordem de venda

Primeiro, crie um arquivo CSV que descreva uma ordem de venda e, em seguida, carregue esse arquivo na conta de armazenamento. Posteriormente, você usará os dados desse arquivo para preencher a primeira linha em nossa tabela do Databricks Delta.

  1. Navegue até sua nova conta de armazenamento no portal do Azure.

  2. Selecione Navegador de armazenamento –>Contêineres de blob –>Adicionar contêiner e crie um contêiner chamado dados.

    Captura de tela da criação de uma pasta no navegador de armazenamento.

  3. No contêiner dados, crie um diretório chamado entrada.

  4. Em um editor de texto, cole o texto a seguir.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. Salve esse arquivo em seu computador local e dê a ele o nome data.csv.

  6. No navegador de armazenamento, carregue esse arquivo na pasta entrada.

Criar um trabalho no Azure Databricks

Nesta seção, você executará estas tarefas:

  • Criar um workspace do Azure Databricks.
  • Crie um notebook.
  • Criar e popular uma tabela do Databricks Delta.
  • Adicionar código que insere linhas na tabela do Databricks Delta.
  • Criar um trabalho.

Criar um workspace do Azure Databricks

Nesta seção, você deve cria um workspace do Azure Databricks usando o Portal do Azure.

  1. Criar um workspace do Azure Databricks. Nomeie esse espaço de trabalho como contoso-orders. Consulte Criar um workspace do Azure Databricks.

  2. Criar um cluster. Nomeie o cluster customer-order-cluster. Consulte Criar um cluster.

  3. Crie um notebook. Nomeie o notebook configure-customer-table e escolha Python como a linguagem padrão do notebook. Confira Criar um notebook.

Criar e popular uma tabela do Databricks Delta

  1. No notebook criado, copie e cole o bloco de código a seguir na primeira célula, mas não execute esse código ainda.

    Substitua os valores de espaço reservado appId, password e tenant neste código pelos valores coletados ao concluir os pré-requisitos deste tutorial.

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    Esse código cria um widget chamado source_file. Posteriormente, você criará uma função do Azure que chama esse código e passa um caminho de arquivo para esse widget. Esse código também autentica a entidade de serviço com a conta de armazenamento e cria algumas variáveis que você usará em outras células.

    Observação

    Em uma configuração de produção, considere armazenar sua chave de autenticação no Azure Databricks. Em seguida, adicione uma chave de pesquisa ao bloco de código em vez da chave de autenticação.

    Por exemplo, em vez de usar esta linha de código spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>"):, você usaria a seguinte linha de código: spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).

    Depois de concluir este tutorial, confira o artigo Azure Data Lake Storage Gen2 no site Azure Databricks para ver exemplos dessa abordagem.

  2. Pressione as teclas SHIFT+ENTER para executar o código nesse bloco.

  3. Copie e cole o bloco de código a seguir em uma célula diferente e pressione as teclas SHIFT + ENTER para executar o código nesse bloco.

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    Esse código cria a tabela do Databricks Delta na conta de armazenamento e, em seguida, carrega alguns dados iniciais do arquivo CSV que você carregou anteriormente.

  4. Depois que esse bloco de código for executado com êxito, remova-o do notebook.

Adicionar código que insere linhas na tabela do Databricks Delta

  1. Copie e cole o bloco de código a seguir em uma célula diferente, mas não execute essa célula.

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    Esse código insere dados em uma exibição de tabela temporária usando dados de um arquivo CSV. O caminho para esse arquivo CSV é proveniente do widget de entrada que você criou em uma etapa anterior.

  2. Copie e cole o seguinte bloco de código em uma célula diferente. Esse código mescla o conteúdo da exibição de tabela temporária com a tabela do Databricks Delta.

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

Crie um trabalho

Crie um trabalho que execute o notebook que você criou anteriormente. Posteriormente, você criará uma função do Azure que executará esse trabalho quando um evento for gerado.

  1. Selecione Novo->Trabalho.

  2. Dê um nome ao trabalho, escolha o notebook que você criou e o cluster. Em seguida, selecione Criar para criar o trabalho.

Criar uma Função do Azure

Crie uma função do Azure que executa o trabalho.

  1. Em seu espaço de trabalho do Azure Databricks, clique no nome de usuário na barra superior e, em seguida, na lista suspensa selecione Configurações do Usuário.

  2. Na guia Tokens de acessoselecione Gerar novo token.

  3. Copie o token que aparece e, em seguida, clique em Concluído.

  4. No canto superior do workspace do Databricks, escolha o ícone de pessoas e, em seguida, escolha Configurações do usuário.

    Gerenciar conta

  5. Selecione o botão Gerar novo token e escolha o botão Gerar.

    Verifique se você copiou o token para um local seguro. Para que a função do Azure possa executar o trabalho, ela precisa desse token para autenticar com o Databricks.

  6. No menu do portal do Azure ou na Página inicial, selecione Criar um recurso.

  7. Na página Novo, selecione Computação>Aplicativo de Funções.

  8. Na guia Noções básicas da página Criar aplicativo de funções, escolha um grupo de recursos e altere ou verifique as seguintes configurações:

    Configuração Valor
    Nome do aplicativo de funções contosoorder
    Pilha de runtime .NET
    Publicação Código
    Sistema operacional Windows
    Tipo de plano Consumo (sem servidor)
  9. Selecione Examinar + Criar e, em seguida, selecione Criar.

    Quando a implantação for concluída, selecione Ir para o recurso para abrir a página de visão geral do aplicativo de funções.

  10. No grupo Configurações, selecione Configuração.

  11. Na página Configurações do Aplicativo, escolha o botão Nova configuração de aplicativo para adicionar cada configuração.

    Adicionar definição de configuração

    Adicione as seguintes configurações:

    Nome da configuração Valor
    DBX_INSTANCE A região do workspace do Databricks. Por exemplo: westus2.azuredatabricks.net
    DBX_PAT O token de acesso pessoal que você gerou anteriormente.
    DBX_JOB_ID O identificador do trabalho em execução.
  12. Selecione Salvar para confirmar essas configurações.

  13. No grupo Funções, selecione Functions e, em seguida, selecione Criar.

  14. Escolha Gatilho de Grade de Eventos do Azure.

    Instale a extensão Microsoft.Azure.WebJobs.Extensions.EventGrid se for solicitado que você faça isso. Se precisar instalá-la, você precisará escolher o Gatilho de Grade de Eventos do Azure novamente para criar a função.

    O painel Nova Função é exibido.

  15. No painel Nova Função, nomeie a função UpsertOrder e escolha o botão Criar.

  16. Substitua o conteúdo do arquivo de código por este código e selecione o botão Salvar:

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

Esse código analisa informações sobre o evento de armazenamento que foi gerado e, em seguida, cria uma mensagem de solicitação com a URL do arquivo que disparou o evento. Como parte da mensagem, a função passa um valor para o widget source_file que você criou anteriormente. O código de função envia a mensagem para o trabalho do Databricks e usa o token que você obteve anteriormente como autenticação.

Criar uma assinatura na Grade de Eventos

Nesta seção, você criará uma assinatura da Grade de Eventos que chama a função do Azure quando os arquivos são carregados na conta de armazenamento.

  1. Selecione Integração e, na página Integração, selecione Gatilho da Grade de Eventos.

  2. No painel Editar gatilho, nomeie o evento eventGridEvent e selecione Criar assinatura de evento.

    Observação

    O nome eventGridEvent corresponde ao parâmetro nomeado que é passado para a função do Azure.

  3. Na guia Noções básicas da página Criar assinatura de evento, altere ou verifique as seguintes configurações:

    Configuração Valor
    Nome contoso-order-event-subscription
    Tipo de tópico Conta de armazenamento
    Recurso de Origem contosoorders
    Nome do tópico do sistema <create any name>
    Filtro para Tipos de Evento Blob criado e Blob excluído
  4. Selecione o botão Criar.

Testar a assinatura de Grade de Eventos

  1. Crie um arquivo chamado customer-order.csv, cole as informações a seguir nesse arquivo e salve-o no computador local.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. Em Gerenciador de Armazenamento, carregue esse arquivo na pasta input da conta de armazenamento.

    O upload de um arquivo gera o evento Microsoft.Storage.BlobCreated. A Grade de Eventos notifica todos os assinantes desse evento. Em nosso caso, a função do Azure é a única assinante. A função do Azure analisa os parâmetros do evento para determinar qual evento ocorreu. Em seguida, ela passa a URL do arquivo para o trabalho do Databricks. O trabalho do databricks lê o arquivo e adiciona uma linha à tabela do Databricks Delta que está localizada na conta de armazenamento.

  3. Para verificar se o trabalho foi bem-sucedido, exiba as execuções do trabalho. Você verá um status de conclusão. Para obter mais informações sobre como exibir execuções de um trabalho, confira Exibir execuções para um trabalho

  4. Em uma nova célula da pasta de trabalho, execute esta consulta em uma célula para ver a tabela delta atualizada.

    %sql select * from customer_data
    

    A tabela retornada mostra o registro mais recente.

    O registro mais recente aparece na tabela

  5. Para atualizar esse registro, crie um arquivo chamado customer-order-update.csv, cole as informações a seguir nesse arquivo e salve-o no computador local.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    Esse arquivo csv é quase idêntico ao anterior, exceto que a quantidade do pedido é alterada de 228 para 22.

  6. Em Gerenciador de Armazenamento, carregue esse arquivo na pasta input da conta de armazenamento.

  7. Execute a consulta select novamente para ver a tabela delta atualizada.

    %sql select * from customer_data
    

    A tabela retornada mostra o registro atualizado.

    O registro atualizado aparece na tabela

Limpar os recursos

Quando não forem mais necessários, exclua o grupo de recursos e todos os recursos relacionados. Para fazer isso, selecione o grupo de recursos da conta de armazenamento e selecione Excluir.

Próximas etapas