データ ワークフローの要件としてプライベート パッケージをインストールする

Note

データ ワークフローには Apache Airflow が搭載されています。
Apache Airflow は、複雑なデータ ワークフローをプログラムで作成、スケジュール、監視するために使用されるオープンソース プラットフォームです。 これにより、オペレーターと呼ばれる一連のタスクを定義でき、有向非循環グラフ (DAG) と組み合わせてデータ パイプラインを表すことができます。

Python パッケージは、関連する Python モジュールを単一のディレクトリ階層に整理する方法です。 パッケージは通常、init.py という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。 データ ワークフローのコンテキストでは、カスタム コードを追加するパッケージを作成できます。

このガイドでは、Python パッケージのバイナリ配布形式として機能する .whl (Wheel) ファイルをデータ ワークフローにインストールする手順について説明します。

説明のために、dag ファイル内のモジュールとしてインポートできる Python パッケージとして単純なカスタム演算子を作成します。

Apache Airflow Dag を使用してカスタム演算子を開発し、テストする

  1. ファイル sample_operator.py を作成し、プライベート パッケージに変換します。 ガイド:「Python でのパッケージの作成」を参照してください

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 手順 1 で定義された演算子をテストするApache Airflow DAG ファイル sample_dag.py を作成します。

    from datetime import datetime
    from airflow import DAG
    
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. Dags フォルダーとプライベート パッケージ ファイル内に sample_dag.py を含む GitHub リポジトリを作成します。 一般的なファイル形式には、zip.whltar.gz があります。 適切な "Dags" フォルダーまたは "Plugins" フォルダー内にファイルを配置します。 Git リポジトリをデータ ワークフローと同期するか、構成済みのリポジトリ (Install-Private-Package)[https://github.com/ambika-garg/Install-Private-Package-Fabric] を使用できます

パッケージを要件として追加する

Airflow requirements でパッケージを要件として追加します。 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage> という形式を使用します

たとえば、プライベート パッケージが GitHub リポジトリの /dags/test/private.whl にある場合は、Airflow 環境に要件 /opt/airflow/git/<repoName>.git/dags/test/private.whl を追加します。

要件として追加されたプライベート パッケージを示すスクリーンショット。

クイック スタート: データ ワークフローを作成する