データ ワークフローの要件としてプライベート パッケージをインストールする
Note
データ ワークフローには Apache Airflow が搭載されています。 Apache Airflow は、複雑なデータ ワークフローをプログラムで作成、スケジュール、監視するために使用されるオープンソース プラットフォームです。 これにより、オペレーターと呼ばれる一連のタスクを定義でき、有向非循環グラフ (DAG) と組み合わせてデータ パイプラインを表すことができます。
Python パッケージは、関連する Python モジュールを単一のディレクトリ階層に整理する方法です。 パッケージは通常、init.py という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。 データ ワークフローのコンテキストでは、カスタム コードを追加するパッケージを作成できます。
このガイドでは、Python パッケージのバイナリ配布形式として機能する .whl
(Wheel) ファイルをデータ ワークフローにインストールする手順について説明します。
説明のために、dag ファイル内のモジュールとしてインポートできる Python パッケージとして単純なカスタム演算子を作成します。
Apache Airflow Dag を使用してカスタム演算子を開発し、テストする
ファイル
sample_operator.py
を作成し、プライベート パッケージに変換します。 ガイド:「Python でのパッケージの作成」を参照してくださいfrom airflow.models.baseoperator import BaseOperator class SampleOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" return message
手順 1 で定義された演算子をテストするApache Airflow DAG ファイル
sample_dag.py
を作成します。from datetime import datetime from airflow import DAG from airflow_operator.sample_operator import SampleOperator with DAG( "test-custom-package", tags=["example"] description="A simple tutorial DAG", schedule_interval=None, start_date=datetime(2021, 1, 1), ) as dag: task = SampleOperator(task_id="sample-task", name="foo_bar") task
Dags
フォルダーとプライベート パッケージ ファイル内にsample_dag.py
を含む GitHub リポジトリを作成します。 一般的なファイル形式には、zip
、.whl
、tar.gz
があります。 適切な "Dags" フォルダーまたは "Plugins" フォルダー内にファイルを配置します。 Git リポジトリをデータ ワークフローと同期するか、構成済みのリポジトリ (Install-Private-Package)[https://github.com/ambika-garg/Install-Private-Package-Fabric] を使用できます
パッケージを要件として追加する
Airflow requirements
でパッケージを要件として追加します。 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>
という形式を使用します
たとえば、プライベート パッケージが GitHub リポジトリの /dags/test/private.whl
にある場合は、Airflow 環境に要件 /opt/airflow/git/<repoName>.git/dags/test/private.whl
を追加します。