Apache Airflow işinde gereksinim olarak Özel Paket yükleme

Not

Apache Airflow işi Apache Airflow tarafından desteklenir.
Apache Airflow, karmaşık veri iş akışlarını program aracılığıyla oluşturmak, zamanlamak ve izlemek için kullanılan açık kaynak bir platformdur. Veri işlem hatlarını temsil etmek için yönlendirilmiş döngüsel grafikler (DAG)'ler halinde birleştirilebilen işleçler olarak adlandırılan bir görev kümesi tanımlamanıza olanak tanır.

Python paketi, ilgili Python modüllerini tek bir dizin hiyerarşisinde düzenlemenin bir yoludur. Paket genellikle init.py adlı özel bir dosya içeren bir dizin olarak temsil edilir. Paket dizininde işlevleri, sınıfları ve değişkenleri tanımlayan birden çok Python modülü dosyanız (.py dosyaları) olabilir. Apache Airflow İşi bağlamında özel Apache Airflow operatörleri, kancalar, algılayıcılar, eklentiler vb. eklemek için özel paketler geliştirebilirsiniz.

Bu öğreticide Python paketi olarak basit bir özel işleç oluşturacak, Apache Airflow iş ortamına bir gereksinim olarak ekleyecek ve özel paketi DAG dosyasında modül olarak içeri aktaracaksınız.

Apache Airflow Dag ile özel operatör geliştirme ve test

  1. Bir dosya sample_operator.py oluşturun ve özel pakete dönüştürün. Kılavuza bakın: Python'da paket oluşturma

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 1. Adımda tanımlanan işleci test etmek için Apache Airflow DAG dosyasını sample_dag.py oluşturun.

    from datetime import datetime
    from airflow import DAG
    
     # Import from private package
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. in Dags klasörünü ve özel paket dosyanızı içeren sample_dag.py bir GitHub Deposu oluşturun. Yaygın dosya biçimleri , .whl, veya tar.gz'dırzip. Dosyayı uygun şekilde 'Dags' veya 'Plugins' klasörüne yerleştirin. Git Deponuzu Apache Airflow İşi ile eşitleyin veya önceden yapılandırılmış install-private-package deposunukullanabilirsiniz

Paketinizi gereksinim olarak ekleme

paketini altına Airflow requirementsgereksinim olarak ekleyin. Biçimi kullanma /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>

Örneğin, özel paketiniz bir GitHub deposunda bulunuyorsa /dags/test/private.whl , gereksinimi /opt/airflow/git/<repoName>.git/dags/test/private.whl Airflow ortamına ekleyin.

Gereksinim olarak eklenen özel paketi gösteren ekran görüntüsü.

Hızlı Başlangıç: Apache Airflow İşi Oluşturma