Aktivita ForEach ve službě Azure Data Factory a Azure Synapse Analytics

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

Aktivita ForEach definuje opakující se tok řízení v kanálu Azure Data Factory nebo Synapse. Tato aktivita se používá k opakování v kolekci a spouští zadané aktivity ve smyčce. Implementace smyčky této aktivity se podobá struktuře smyčky Foreach v programovacích jazycích.

Vytvoření aktivity ForEach pomocí uživatelského rozhraní

Pokud chcete v kanálu použít aktivitu ForEach, proveďte následující kroky:

  1. Jako vstup pro aktivitu ForEach můžete použít libovolnou proměnnou typu pole nebo výstupy z jiných aktivit . Pokud chcete vytvořit maticovou proměnnou, vyberte pozadí plátna kanálu a pak výběrem karty Proměnné přidejte proměnnou typu pole, jak je znázorněno níže.

    Zobrazuje prázdné plátno kanálu s proměnnou typu pole přidanou do kanálu.

  2. Vyhledejte forEach v podokně Aktivity kanálu a přetáhněte aktivitu ForEach na plátno kanálu.

  3. Pokud ještě není vybraná, vyberte novou aktivitu ForEach na plátně a jeho kartu Nastavení a upravte podrobnosti.

    Zobrazuje uživatelské rozhraní pro aktivitu filtru.

  4. Vyberte pole Položky a pak výběrem odkazu Přidat dynamický obsah otevřete podokno editoru dynamického obsahu.

    Zobrazuje   Přidat dynamický obsah  odkaz pro vlastnost Items.

  5. Vyberte vstupní pole, které chcete filtrovat v editoru dynamického obsahu. V tomto příkladu vybereme proměnnou vytvořenou v prvním kroku.

    Zobrazuje editor dynamického obsahu s proměnnou vytvořenou v prvním kroku.

  6. Vyberte editor aktivit v aktivitě ForEach a přidejte jednu nebo více aktivit, které se mají spustit pro každou položku ve vstupním poli Položky .

    Zobrazuje tlačítko Editor aktivit v aktivitě ForEach v okně editoru kanálu.

  7. Ve všech aktivitách, které vytvoříte v rámci aktivity ForEach, můžete odkazovat na aktuální položku, kterou aktivita ForEach iteruje ze seznamu Položek . Na aktuální položku můžete odkazovat kdekoli, kde můžete zadat hodnotu vlastnosti pomocí dynamického výrazu. V editoru dynamického obsahu vyberte iterátor ForEach a vraťte aktuální položku.

    Zobrazuje editor dynamického obsahu s vybraným iterátorem ForEach.

Syntaxe

Vlastnosti jsou popsány dále v tomto článku. Vlastnost items je kolekce a každá položka v kolekci je označována pomocí @item() , jak je znázorněno v následující syntaxi:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Vlastnosti typu

Vlastnost Popis Povolené hodnoty Požaduje se
name Název jednotlivých aktivit. String Ano
type Musí být nastavená hodnota ForEach. String Ano
isSequential Určuje, jestli se má smyčka spouštět postupně nebo paralelně. Paralelně lze spustit maximálně 50 iterací smyčky najednou). Pokud máte například aktivitu ForEach, která iteruje aktivitu kopírování s 10 různými zdrojovými datovými sadami a datovými sadami jímky s hodnotou IsSequential nastavenou na Hodnotu False, spustí se všechny kopie najednou. Výchozí hodnota je False.

Pokud je hodnota isSequential nastavená na False, ujistěte se, že existuje správná konfigurace pro spouštění více spustitelných souborů. Jinak by tato vlastnost měla být použita s opatrností, aby nedocházelo ke konfliktům zápisu. Další informace naleznete v části Paralelní spuštění .
Logická hodnota Ne. Výchozí hodnota je False.
batchCount Počet dávek, který se má použít pro řízení počtu paralelních spuštění (pokud je hodnota isSequential nastavená na false). Jedná se o horní limit souběžnosti, ale u každé aktivity se nespustí vždy na tomto čísle. Celé číslo (maximálně 50) Ne. Výchozí hodnota je 20.
Items Výraz, který vrátí pole JSON, které se má iterated převést. Výraz (který vrací pole JSON) Ano
Aktivity Aktivity, které se mají provést. Seznam aktivit Ano

Paralelní spouštění

Pokud je hodnota isSequential nastavena na false, aktivita iteruje paralelně s maximálně 50 souběžnými iteracemi. Toto nastavení by se mělo používat s opatrností. Pokud souběžné iterace zapisují do stejné složky, ale do různých souborů, je tento přístup v pořádku. Pokud souběžné iterace zapisují souběžně do stejného souboru, pravděpodobně tento přístup způsobí chybu.

Jazyk výrazů iterace

V aktivitě ForEach zadejte pole, které má být iterated pro položky vlastnosti." Slouží @item() k iteraci jednoho výčtu v aktivitě ForEach. Pokud jsou například položky matice: [1, 2, 3], @item() vrátí hodnotu 1 v první iteraci, 2 ve druhé iteraci a 3 ve třetí iteraci. Můžete také použít @range(0,10) podobný výraz k iteraci desetkrát od 0 do 9.

Iterace přes jednu aktivitu

Scénář: Zkopírujte ze stejného zdrojového souboru v Objektu blob Azure do více cílových souborů v Objektu blob Azure.

Definice kanálu

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definice datové sady objektů blob

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Spuštění hodnot parametrů

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Iterace více aktivit

V aktivitě ForEach je možné iterovat více aktivit (například kopírování a webové aktivity). V tomto scénáři doporučujeme abstrahovat více aktivit do samostatného kanálu. Potom můžete použít aktivitu ExecutePipeline v kanálu s aktivitou ForEach k vyvolání samostatného kanálu s více aktivitami.

Syntaxe

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Příklad

Scénář: Iterace přes InnerPipeline v rámci aktivity ForEach s aktivitou Execute Pipeline. Vnitřní kanál kopíruje s parametrizovanými definicemi schématu.

Definice hlavního kanálu

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definice vnitřního kanálu

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definice zdrojové datové sady

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definice datové sady jímky

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametry hlavního kanálu

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Agregace výstupů

Pokud chcete agregovat výstupy aktivity foreach, využijte proměnné a aktivitu přidávací proměnné.

Nejprve deklarujte proměnnou array v kanálu. Potom v každé smyčce foreach vyvoláte aktivitu přidávací proměnné. Následně můžete agregaci načíst z pole.

Omezení a zástupná řešení

Tady jsou některá omezení aktivity ForEach a navrhovaná alternativní řešení.

Omezení Alternativní řešení
Smyčku ForEach nelze vnořit do jiné smyčky ForEach (nebo do smyčky Until). Navrhňte dvouúrovňový kanál, ve kterém vnější kanál s vnější smyčkou ForEach iteruje vnitřní kanál s vnořenou smyčkou.
Aktivita ForEach má maximálně batchCount 50 pro paralelní zpracování a maximálně 100 000 položek. Navrhňte dvouúrovňový kanál, ve kterém se vnější kanál s aktivitou ForEach iteruje přes vnitřní kanál.
SetVariable nelze použít uvnitř aktivity ForEach, která běží paralelně, protože proměnné jsou globální pro celý kanál, nejsou vymezeny na ForEach ani na žádnou jinou aktivitu. Zvažte použití sekvenčního příkazu ForEach nebo použití spuštění kanálu uvnitř forEach (proměnná nebo parametr zpracovaný v podřízené kanálu).

Projděte si další podporované aktivity toku řízení: