ワークフロー オーケストレーション マネージャーを使用して既存のパイプラインを実行する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

Note

ワークフロー オーケストレーション マネージャーは Apache Airflow を利用しています。

Note

Azure Data Factory 用のワークフロー オーケストレーション マネージャーは、オープンソースの Apache Airflow アプリケーションに依存しています。 Airflow のドキュメントとその他のチュートリアルについては、Apache Airflow のドキュメントまたはコミュニティのページを参照してください。

Data Factory パイプラインでは、スケーラブルで信頼性の高いデータ統合とデータ フローを提供する 100 以上のデータ ソース コネクタが提供されます。 Apache Airflow DAG から既存のデータ ファクトリ パイプラインを実行するシナリオがあります。 このチュートリアルでは、その方法のみを示します。

前提条件

  • Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料の Azure アカウントを作成してください。
  • Azure ストレージ アカウント。 ストレージ アカウントがない場合の作成手順については、Azure のストレージ アカウントの作成に関するページを参照してください。 ストレージ アカウントで、選択したネットワーク からのアクセスのみが許可されていることを確認します。
  • Azure Data Factory パイプライン。 チュートリアルのいずれかに従い、まだデータ ファクトリ パイプラインがない場合は新しいものを作成するか、「最初のデータ ファクトリ パイプラインの開始と試用」で 1 つの選択で作成できます。
  • サービス プリンシパルを設定する。 ワークフロー オーケストレーション マネージャー環境とパイプラインが同じデータ ファクトリに存在する場合でも、新しいサービス プリンシパルを作成するか、既存のものを使用してパイプラインを実行するためのアクセス許可を付与する必要があります (例: 既存のパイプラインが存在するデータ ファクトリの共同作成者ロール)。 サービス プリンシパルのクライアント ID とクライアント シークレット (API キー) を取得する必要があります。

手順

  1. 以下の内容を含む新しい Python ファイル adf.py を作成します。

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    ワークフロー オーケストレーション マネージャー UI を使用して接続を作成する必要があります ([管理者] -> [接続] -> [+] -> [接続の種類] として [Azure Data Factory] を選択し、次に client_idclient_secrettenant_idsubscription_idresource_group_namedata_factory_namepipeline_name を入力する)。

  2. adf.py ファイルを DAGS というフォルダー内の BLOB ストレージにアップロードします。

  3. DAGS フォルダーをワークフロー オーケストレーション マネージャー環境にインポートします。 ない場合は、新しいものを作成します

    [エアフロー] セクションが選択されているデータ ファクトリ管理タブを示すスクリーンショット。