Aktivita ForEach ve službě Azure Data Factory a Azure Synapse Analytics
PLATÍ PRO: Azure Data Factory Azure Synapse Analytics
Tip
Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.
Aktivita ForEach definuje opakující se tok řízení v kanálu Azure Data Factory nebo Synapse. Tato aktivita se používá k opakování v kolekci a spouští zadané aktivity ve smyčce. Implementace smyčky této aktivity se podobá struktuře smyčky Foreach v programovacích jazycích.
Vytvoření aktivity ForEach pomocí uživatelského rozhraní
Pokud chcete v kanálu použít aktivitu ForEach, proveďte následující kroky:
Jako vstup pro aktivitu ForEach můžete použít libovolnou proměnnou typu pole nebo výstupy z jiných aktivit . Pokud chcete vytvořit maticovou proměnnou, vyberte pozadí plátna kanálu a pak výběrem karty Proměnné přidejte proměnnou typu pole, jak je znázorněno níže.
Vyhledejte forEach v podokně Aktivity kanálu a přetáhněte aktivitu ForEach na plátno kanálu.
Pokud ještě není vybraná, vyberte novou aktivitu ForEach na plátně a jeho kartu Nastavení a upravte podrobnosti.
Vyberte pole Položky a pak výběrem odkazu Přidat dynamický obsah otevřete podokno editoru dynamického obsahu.
Vyberte vstupní pole, které chcete filtrovat v editoru dynamického obsahu. V tomto příkladu vybereme proměnnou vytvořenou v prvním kroku.
Vyberte editor aktivit v aktivitě ForEach a přidejte jednu nebo více aktivit, které se mají spustit pro každou položku ve vstupním poli Položky .
Ve všech aktivitách, které vytvoříte v rámci aktivity ForEach, můžete odkazovat na aktuální položku, kterou aktivita ForEach iteruje ze seznamu Položek . Na aktuální položku můžete odkazovat kdekoli, kde můžete zadat hodnotu vlastnosti pomocí dynamického výrazu. V editoru dynamického obsahu vyberte iterátor ForEach a vraťte aktuální položku.
Syntaxe
Vlastnosti jsou popsány dále v tomto článku. Vlastnost items je kolekce a každá položka v kolekci je označována pomocí @item()
, jak je znázorněno v následující syntaxi:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Vlastnosti typu
Vlastnost | Popis | Povolené hodnoty | Požaduje se |
---|---|---|---|
name | Název jednotlivých aktivit. | String | Ano |
type | Musí být nastavená hodnota ForEach. | String | Ano |
isSequential | Určuje, jestli se má smyčka spouštět postupně nebo paralelně. Paralelně lze spustit maximálně 50 iterací smyčky najednou). Pokud máte například aktivitu ForEach, která iteruje aktivitu kopírování s 10 různými zdrojovými datovými sadami a datovými sadami jímky s hodnotou IsSequential nastavenou na Hodnotu False, spustí se všechny kopie najednou. Výchozí hodnota je False. Pokud je hodnota isSequential nastavená na False, ujistěte se, že existuje správná konfigurace pro spouštění více spustitelných souborů. Jinak by tato vlastnost měla být použita s opatrností, aby nedocházelo ke konfliktům zápisu. Další informace naleznete v části Paralelní spuštění . |
Logická hodnota | Ne. Výchozí hodnota je False. |
batchCount | Počet dávek, který se má použít pro řízení počtu paralelních spuštění (pokud je hodnota isSequential nastavená na false). Jedná se o horní limit souběžnosti, ale u každé aktivity se nespustí vždy na tomto čísle. | Celé číslo (maximálně 50) | Ne. Výchozí hodnota je 20. |
Items | Výraz, který vrátí pole JSON, které se má iterated převést. | Výraz (který vrací pole JSON) | Ano |
Aktivity | Aktivity, které se mají provést. | Seznam aktivit | Ano |
Paralelní spouštění
Pokud je hodnota isSequential nastavena na false, aktivita iteruje paralelně s maximálně 50 souběžnými iteracemi. Toto nastavení by se mělo používat s opatrností. Pokud souběžné iterace zapisují do stejné složky, ale do různých souborů, je tento přístup v pořádku. Pokud souběžné iterace zapisují souběžně do stejného souboru, pravděpodobně tento přístup způsobí chybu.
Jazyk výrazů iterace
V aktivitě ForEach zadejte pole, které má být iterated pro položky vlastnosti." Slouží @item()
k iteraci jednoho výčtu v aktivitě ForEach. Pokud jsou například položky matice: [1, 2, 3], @item()
vrátí hodnotu 1 v první iteraci, 2 ve druhé iteraci a 3 ve třetí iteraci. Můžete také použít @range(0,10)
podobný výraz k iteraci desetkrát od 0 do 9.
Iterace přes jednu aktivitu
Scénář: Zkopírujte ze stejného zdrojového souboru v Objektu blob Azure do více cílových souborů v Objektu blob Azure.
Definice kanálu
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definice datové sady objektů blob
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Spuštění hodnot parametrů
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Iterace více aktivit
V aktivitě ForEach je možné iterovat více aktivit (například kopírování a webové aktivity). V tomto scénáři doporučujeme abstrahovat více aktivit do samostatného kanálu. Potom můžete použít aktivitu ExecutePipeline v kanálu s aktivitou ForEach k vyvolání samostatného kanálu s více aktivitami.
Syntaxe
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Příklad
Scénář: Iterace přes InnerPipeline v rámci aktivity ForEach s aktivitou Execute Pipeline. Vnitřní kanál kopíruje s parametrizovanými definicemi schématu.
Definice hlavního kanálu
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definice vnitřního kanálu
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definice zdrojové datové sady
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definice datové sady jímky
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parametry hlavního kanálu
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Agregace výstupů
Pokud chcete agregovat výstupy aktivity foreach, využijte proměnné a aktivitu přidávací proměnné.
Nejprve deklarujte proměnnou array
v kanálu. Potom v každé smyčce foreach vyvoláte aktivitu přidávací proměnné. Následně můžete agregaci načíst z pole.
Omezení a zástupná řešení
Tady jsou některá omezení aktivity ForEach a navrhovaná alternativní řešení.
Omezení | Alternativní řešení |
---|---|
Smyčku ForEach nelze vnořit do jiné smyčky ForEach (nebo do smyčky Until). | Navrhňte dvouúrovňový kanál, ve kterém vnější kanál s vnější smyčkou ForEach iteruje vnitřní kanál s vnořenou smyčkou. |
Aktivita ForEach má maximálně batchCount 50 pro paralelní zpracování a maximálně 100 000 položek. |
Navrhňte dvouúrovňový kanál, ve kterém se vnější kanál s aktivitou ForEach iteruje přes vnitřní kanál. |
SetVariable nelze použít uvnitř aktivity ForEach, která běží paralelně, protože proměnné jsou globální pro celý kanál, nejsou vymezeny na ForEach ani na žádnou jinou aktivitu. | Zvažte použití sekvenčního příkazu ForEach nebo použití spuštění kanálu uvnitř forEach (proměnná nebo parametr zpracovaný v podřízené kanálu). |
Související obsah
Projděte si další podporované aktivity toku řízení: