Spuštění existujícího kanálu pomocí Správce orchestrace pracovního postupu

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

Poznámka:

Správce orchestrace pracovních postupů využívá Apache Airflow.

Poznámka:

Správce orchestrace pracovních postupů pro Azure Data Factory spoléhá na opensourcovou aplikaci Apache Airflow. Dokumentaci a další kurzy pro Airflow najdete na stránkách dokumentace nebo komunity Apache Airflow.

Kanály služby Data Factory poskytují 100 konektorů zdroje dat, které poskytují škálovatelné a spolehlivé integrace dat nebo toky dat. Existují scénáře, ve kterých chcete spustit existující kanál datové továrny z daG Apache Airflow. V tomto kurzu se dozvíte, jak to udělat.

Požadavky

  • Předplatné Azure. Pokud ještě nemáte předplatné Azure, vytvořte si bezplatný účet Azure před tím, než začnete.
  • Účet služby Azure Storage. Pokud účet úložiště nemáte, přečtěte si téma Vytvoření účtu služby Azure Storage, kde najdete postup jeho vytvoření. Ujistěte se, že účet úložiště umožňuje přístup jenom z vybraných sítí.
  • Kanál služby Azure Data Factory Můžete postupovat podle libovolného kurzu a vytvořit nový kanál datové továrny pro případ, že ho ještě nemáte, nebo si ho můžete vytvořit výběrem v části Začínáme a vyzkoušet si první kanál datové továrny.
  • Nastavte instanční objekt. Budete muset vytvořit nový instanční objekt nebo použít existující objekt a udělit mu oprávnění ke spuštění kanálu (například role přispěvatele v datové továrně, kde existují existující kanály), i když prostředí Správce orchestrace pracovního postupu a kanály existují ve stejné datové továrně. Budete muset získat ID klienta a tajný klíč klienta instančního objektu (klíč rozhraní API).

Kroky

  1. Vytvořte nový soubor Pythonu adf.py s následujícím obsahem:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Připojení budete muset vytvořit pomocí správce uživatelského rozhraní Orchestration Manageru pracovního postupu –> Připojení –> + –> Zvolte typ připojení jako Azure Data Factory a pak vyplňte client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name a pipeline_name.

  2. Nahrajte soubor adf.py do úložiště objektů blob ve složce s názvem DAGS.

  3. Naimportujte složku DAGS do prostředí Správce orchestrace pracovního postupu. Pokud ho nemáte, vytvořte si nový.

    Snímek obrazovky znázorňující kartu správy datové továrny s vybranou částí Airflow