Orquestração de trabalho do Apache Flink® usando o Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory (alimentado pelo Apache Airflow)

Observação

Desativaremos o Microsoft Azure HDInsight no AKS em 31 de janeiro de 2025. Para evitar o encerramento abrupto das suas cargas de trabalho, você precisará migrá-las para o Microsoft Fabric ou para um produto equivalente do Azure antes de 31 de janeiro de 2025. Os clusters restantes em sua assinatura serão interrompidos e removidos do host.

Apenas o suporte básico estará disponível até a data de desativação.

Importante

Esse recurso está atualmente na visualização. Os Termos de uso complementares para versões prévias do Microsoft Azure incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, confira Informações sobre a versão prévia do Azure HDInsight no AKS. No caso de perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para ver mais atualizações sobre a Comunidade do Azure HDInsight.

Este artigo aborda o gerenciamento de um trabalho Flink usando a API REST do Azure e o pipeline de dados de orquestração com o Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory. O serviço Azure Data Factory gerenciador de orquestração de fluxo de trabalho é uma maneira simples e eficiente de criar e gerenciar ambientes Apache Airflow, permitindo que você execute pipelines de dados em escala facilmente.

O Apache Airflow é uma plataforma de software livre usada para criar, agendar e monitorar fluxos de trabalho de dados complexos de maneira programática. Ele permite que você defina um conjunto de tarefas, chamadas de operadores, que podem ser combinadas em grafos acíclicos direcionados (DAGs) para representar pipelines de dados.

O diagrama a seguir mostra o posicionamento do Airflow, Key Vault e HDInsight no AKS no Azure.

A captura de tela mostra o posicionamento do fluxo de ar, do key vault e do HDInsight no AKS no Azure.

Várias Entidades de Serviço do Azure são criadas com base no escopo para limitar o acesso necessário e gerenciar o ciclo de vida da credencial do cliente de maneira independente.

É recomendável girar chaves de acesso ou segredos periodicamente.

Etapas de configuração

  1. Configurar um cluster do Flink

  2. Carregue o jar do seu Flink Job na conta de armazenamento. Ela pode ser a conta de armazenamento primária associada ao cluster do Flink ou qualquer outra conta de armazenamento, na qual você deve atribuir a função "Proprietário de Dados de Blob de Armazenamento" ao MSI atribuído pelo usuário usado para o cluster nessa conta de armazenamento.

  3. Azure Key Vault – você poderá seguir este tutorial para criar um Azure Key Vault se não tiver um.

  4. Crie uma entidade de serviço Microsoft Entra para acessar o Key Vault. Conceda permissão para acessar o Azure Key Vault com a função de "Oficial de Segredos do Key Vault", e faça uma anotação do ‘appId’, ‘senha’ e ‘locatário’ obtidos na resposta. Precisamos usar os mesmos valores para o Airflow usar o armazenamento do Key Vault como back-ends para armazenar informações confidenciais.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Habilite Azure Key Vault para gerenciador de orquestração de fluxo de trabalho para armazenar e gerenciar suas informações confidenciais de maneira segura e centralizada. Ao fazer isso, você pode usar variáveis e conexões e elas serão armazenadas automaticamente no Azure Key Vault. O nome de conexões e variáveis precisa ser prefixado por variables_prefix, definido em AIRFLOW__SECRETS__BACKEND_KWARGS. Por exemplo, se variables_prefix tiver um valor como hdinsight-aks-variables, para uma chave de variável hello, você desejará armazenar sua variável em hdinsight-aks-variable -hello.

    • Adicione as seguintes configurações para as substituições de configuração do Airflow em propriedades de runtime integradas:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Adicione a seguinte definição para a configuração Variáveis de ambiente nas propriedades de runtime integrado do Airflow:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Adicionar requisitos do Airflow – apache-airflow-providers-microsoft-azure

      A captura de tela mostra a configuração do fluxo de ar e as variáveis de ambiente.

  6. Crie uma entidade de serviço do Microsoft Entra para acessar o Azure – Conceda permissão para acessar o cluster do HDInsight no AKS com a função “Colaborador” e anote os valores de ‘appId’, ‘senha’ e ‘locatário’ da resposta.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Crie os segredos a seguir no cofre de chaves com o valor da appId, senha e locatário da entidade de serviço anterior do AD, prefixados pela propriedade variables_prefix definida em AIRFLOW__SECRETS__BACKEND_KWARGS. O código DAG pode acessar qualquer uma dessas variáveis sem variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

Definição de DAG

Um DAG (grafo direcionado acíclico) é o conceito principal do airflow, coletando tarefas em conjunto, organizadas com dependências e relações para dizer como elas devem ser executadas.

Há três maneiras de declarar uma DAG:

  1. Você pode usar um gerenciador de contexto, que adiciona o DAG a qualquer coisa dentro dele implicitamente

  2. Você pode usar um construtor padrão, passando o DAG para todos os operadores usados

  3. Você pode usar o decorador @dag para transformar uma função em um gerador DAG (do dag de importação airflow.decorators)

OS DAGs não são nada sem tarefas a serem executadas, e elas vêm na forma de Operadores, Sensores ou TaskFlow.

Você pode ler mais detalhes sobre DAGs, Fluxo de Controle, SubDAGs, TaskGroups etc. diretamente do Apache Airflow. 

Execução do DAG

O código de exemplo está disponível no git; baixe o código localmente em seu computador e carregue o wordcount.py em um armazenamento de blobs. Siga as etapas para importar o DAG para o fluxo de trabalho criado durante a configuração.

O wordcount.py é um exemplo de orquestração de um envio de trabalho do Flink usando o Apache Airflow com o HDInsight no AKS. O DAG tem duas tarefas:

  • obter OAuth Token

  • Invocar a API REST do Azure de envio de trabalho do Flink do HDInsight para enviar um novo trabalho

O DAG espera ter configuração para a entidade de serviço, conforme descrito durante o processo de instalação da credencial do cliente OAuth e passar a configuração de entrada a seguir para a execução.

Etapas de execução

  1. Execute o DAG a partir do Airflow UI, você pode abrir a UI do Azure Data Factory gerenciador de orquestração de fluxo de trabalho clicando no ícone Monitor.

    A captura de tela mostra a abertura da interface do usuário do gerenciador de orquestração de fluxo de trabalho do Azure Data Factory clicando no ícone do monitor.

  2. Selecione o DAG “FlinkWordCountExample” na página “DAGs”.

    A captura de tela mostra o exemplo de contagem de palavras Flink.

  3. Clique no ícone “executar” no canto superior direito e selecione “Disparar DAG c/ configuração”.

    A captura de tela mostra o ícone de seleção de execução.

  4. Passar o JSON de configuração necessário

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscritpion":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Clique no botão “Disparar”, ele inicia a execução do DAG.

  6. Você pode visualizar o status das tarefas de DAG na execução do DAG

    A captura de tela mostra o status da tarefa dag.

  7. Validar a execução do trabalho por meio do portal

    A captura de tela mostra a validação da execução do trabalho.

  8. Validar o trabalho do “Painel do Apache Flink”

    A captura de tela mostra o painel do Apache Flink.

Código de exemplo

Este é um exemplo de orquestração de pipeline de dados usando o Airflow com o HDInsight no AKS.

O DAG espera ter configuração para a entidade de serviço para a credencial do cliente OAuth e passar a configuração de entrada a seguir para a execução:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscritpion':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Referência