Использование пользовательских действий в Фабрике данных Azure или конвейере Azure Synapse Analytics

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Существует два типа действий, которые можно использовать в Фабрике данных Azure или конвейере Synapse.

Чтобы переместить данные в хранилище данных, которое не поддерживает служба, и обратно, или чтобы преобразовать или обработать данные способом, который не поддерживается службой, создайте пользовательское действие с собственной логикой перемещения или преобразования данных и используйте это действие в конвейере. Настраиваемые действия запускают настраиваемую логику кода в пуле пакетной службы Azure виртуальных машин.

Примечание.

Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Сведения о начале работы см. в статье "Установка Azure PowerShell". Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.

Если вы еще не знакомы с пакетной службой Azure, см. следующие статьи.

  • Основные сведения о пакетной службе Azure — общие сведения о пакетной службе Azure.
  • Статья о командлете New-AzBatchAccount со сведениями о создании учетной записи пакетной службы Azure или статья о портале Azure со сведениями о создании учетной записи пакетной службы Azure с помощью портала Azure. Подробные инструкции по использованию этого командлета см. в статье Using PowerShell to manage Azure Batch Account (Использование PowerShell для управления учетной записью пакетной службы Azure).
  • New-AzBatchPool со сведениями о создании пула пакетной службы Azure.

Внимание

При создании нового пула пакетной службы Azure необходимо использовать "VirtualMachineConfiguration", а не "CloudServiceConfiguration". Дополнительные сведения см. в руководстве по миграции пула пакетной службы Azure.

Добавление настраиваемых действий в конвейер с помощью пользовательского интерфейса

Чтобы использовать в конвейере настраиваемое действие:

  1. Найдите элемент Настраиваемое на панели конвейера "Действия" и перетащите его на холст конвейера.

  2. Выберите настраиваемое действие на холсте, если оно еще не выбрано.

  3. Перейдите на вкладку Пакетная служба Azure, чтобы выбрать или создать новую связанную Пакетную службу Azure, которая будет выполнять настраиваемое действие.

    Пользовательский интерфейс для пользовательского действия.

  4. Откройте вкладку Параметры и укажите команду для выполнения в Пакетной службе Azure, а также дополнительные сведения.

    Пользовательский интерфейс для вкладки параметров для пользовательского действия.

Связанная пакетная служба Azure

Ниже приведен фрагмент кода JSON, который определяет пример связанной пакетной службы Azure. Подробнее см. Поддерживаемые вычислительные среды

{
    "name": "AzureBatchLinkedService",
    "properties": {
        "type": "AzureBatch",
        "typeProperties": {
            "accountName": "batchaccount",
            "accessKey": {
                "type": "SecureString",
                "value": "access key"
            },
            "batchUri": "https://batchaccount.region.batch.azure.com",
            "poolName": "poolname",
            "linkedServiceName": {
                "referenceName": "StorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
}

Дополнительные сведения о связанной пакетной службе Azure см. в статье Вычислительные среды, поддерживаемые фабрикой данных Azure.

Настраиваемое действие

В следующем фрагменте кода JSON определяется конвейер с простым настраиваемым действием. Определение действия содержит ссылку на связанную пакетную службу Azure.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "helloworld.exe",
        "folderPath": "customactv2/helloworld",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }]
  }
}

В этом примере helloworld.exe является пользовательским приложением, которое хранится в папке customactv2/helloworld учетной записи хранения Azure, используемой в resourceLinkedService. Настраиваемое действие отправляет пользовательское приложение для выполнения в пакетной службе Azure. Вы можете заменить команду на любое предпочитаемое приложение, которое можно выполнить в целевой операционной системе узлов пула пакетной службы Azure.

В следующей таблице описаны имена и описания свойств, относящихся к этому действию.

Свойство Описание: Обязательное поле
name Имя действия в конвейере. Да
описание Описание действия. No
type Для пользовательского действия используется тип действия Custom. Да
linkedServiceName Связанная служба пакетной службы Azure. Дополнительные сведения об этой связанной службе см. в статье Вычислительные среды, поддерживаемые фабрикой данных Azure. Да
Команда Команда для выполнения пользовательского приложения. Если приложение уже находится в узле пула пакетной службы Azure, resourceLinkedService и folderPath могут быть пропущены. Например, можно указать команду cmd /c dir, которая изначально поддерживается узлом пула пакетной службы Windows. Да
resourceLinkedService Связанная служба хранилища Azure учетной записи хранения, в которой хранится пользовательское приложение. Нет*
folderPath Путь к папке пользовательского приложения и всех его зависимостей.

При наличие зависимостей, хранящихся в подпапках, то есть в иерархической структуре папок в folderPath. Структура папок в настоящее время преобразована в плоскую структуру, когда файлы скопированы в пакетную службу Azure. То есть все файлы копируются в одну папку без подпапок. Чтобы обойти эту реакцию на событие, рассмотрите возможность сжатия файлов, копирования сжатого файла и затем разархивирования его с помощью специального кода в нужном месте.
Нет*
referenceObjects Массив имеющихся связанных служб и наборов данных. Указанные связанные службы и наборы данных передаются в пользовательское приложение в формате JSON. Пользовательский код может использовать ресурсы службы No
extendedProperties Определенные пользователем свойства, которые могут быть переданы в пользовательское приложение в формате JSON. Пользовательский код может использовать дополнительные свойства. No
retentionTimeInDays Время хранения файлов, отправленных для настраиваемого действия. Значение по умолчанию — 30 суток. No

* Свойства resourceLinkedService и folderPath (оба) должны быть указаны либо пропущены.

Примечание.

Если связанные службы передаются как referenceObjects в пользовательском действии, рекомендуется передать связанную службу с поддержкой Azure Key Vault (так как она не содержит защищенных строк) и получить учетные данные с помощью имени секрета непосредственно из хранилища ключей из кода. Здесь можно найти пример, который ссылается на связанную службу AKV с поддержкой, извлекает учетные данные из Key Vault, а затем обращается к хранилищу в коде.

Примечание.

В настоящее время только хранилище BLOB-объектов Azure поддерживается для resourceLinkedService в пользовательском действии, и это единственная связанная служба, которая создается по умолчанию, и нет возможности выбрать другие соединители, такие как ADLS 2-го поколения.

Разрешения настраиваемых действий

Настраиваемые действия предоставляют автоматической учетной записи пользователя пакетной службы Azure возможность доступа от имени стандартного пользователя с областью задачи (спецификация автоматических пользователей по умолчанию). Уровень разрешений автоматической учетной записи пользователя изменить невозможно. Дополнительные сведения см. в разделе Автоматические учетные записи пользователей статьи "Выполнение задач с учетными записями пользователей в пакетной службе".

Выполнение команд

Вы можете непосредственно выполнить команду, используя настраиваемое действие. В приведенном ниже примере мы выполняем команду "echo hello world" на целевых узлах пула пакетной службы Azure, которая выводит выходные данные в stdout.

{
  "name": "MyCustomActivity",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "cmd /c echo hello world"
      }
    }]
  }
}

Передача объектов и свойств

В этом примере показано, как использовать свойства referenceObjects и extendedProperties для передачи объектов и определенных пользователем свойств из службы в пользовательское приложение.

{
  "name": "MyCustomActivityPipeline",
  "properties": {
    "description": "Custom activity sample",
    "activities": [{
      "type": "Custom",
      "name": "MyCustomActivity",
      "linkedServiceName": {
        "referenceName": "AzureBatchLinkedService",
        "type": "LinkedServiceReference"
      },
      "typeProperties": {
        "command": "SampleApp.exe",
        "folderPath": "customactv2/SampleApp",
        "resourceLinkedService": {
          "referenceName": "StorageLinkedService",
          "type": "LinkedServiceReference"
        },
        "referenceObjects": {
          "linkedServices": [{
            "referenceName": "AzureBatchLinkedService",
            "type": "LinkedServiceReference"
          }]
        },
        "extendedProperties": {          
          "connectionString": {
            "type": "SecureString",
            "value": "aSampleSecureString"
          },
          "PropertyBagPropertyName1": "PropertyBagValue1",
          "propertyBagPropertyName2": "PropertyBagValue2",
          "dateTime1": "2015-04-12T12:13:14Z"
        }
      }
    }]
  }
}

При выполнении действия referenceObjects и extendedProperties сохраняются в указанных ниже файлах. Эти файлы будут развернуты в той же папке выполнения SampleApp.exe.

  • activity.json

    Хранит свойство extendedProperties и свойства настраиваемых действий.

  • linkedServices.json

    Хранит массив связанных служб, определенных в свойстве referenceObjects.

  • datasets.json

    Хранит массив наборов данных, определенных в свойстве referenceObjects.

В следующем примере кода показано, как SampleApp.exe может получить доступ к необходимой информации в JSON-файлах.

using Newtonsoft.Json;
using System;
using System.IO;

namespace SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            //From Extend Properties
            dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
            Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);

            // From LinkedServices
            dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
            Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
        }
    }
}

Извлечение выходных данных выполнения

Чтобы запустить выполнение конвейера, выполните следующую команду PowerShell:

$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName

Во время выполнения конвейера его текущие выходные данные можно проверить с помощью следующих команд:

while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if(!$result) {
        Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
    }
    elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
    }
    else {
        Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
    ($result | Format-List | Out-String)
    Start-Sleep -Seconds 15
}

Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"

Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"

Свойства stdout и stderr пользовательского приложения сохраняются в контейнер adfjobs в связанной службе хранилища Azure, определенной при создании связанной пакетной службы Azure с помощью GUID задания. Вы можете получить подробный путь из выходных данных выполнения действия, как показано в следующем фрагменте кода:

Pipeline ' MyCustomActivity' run finished. Result:

ResourceGroupName : resourcegroupname
DataFactoryName   : datafactoryname
ActivityName      : MyCustomActivity
PipelineRunId     : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName      : MyCustomActivity
Input             : {command}
Output            : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart  : 10/5/2017 3:33:06 PM
ActivityRunEnd    : 10/5/2017 3:33:28 PM
DurationInMs      : 21203
Status            : Succeeded
Error             : {errorCode, message, failureType, target}

Activity Output section:
"exitcode": 0
"outputs": [
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
  "https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"

Если вы хотите использовать содержимое stdout.txt в последующих действиях, путь к файлу stdout.txt можно получить в значении выражения "@activity('MyCustomActivity').output.outputs[0]".

Внимание

  • Свойства activity.json, linkedServices.json и datasets.json хранятся в папке среды выполнения пакетной задачи. Для этого примера файлы activity.json, linkedServices.json и datasets.json хранятся по адресу https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/. При необходимости их следует очищать отдельно.
  • Если связанные службы используют локальную среду выполнения интеграции, конфиденциальная информация, например ключи и пароли, шифруется локальной средой выполнения интеграции. Это гарантирует, что учетные данные останутся в пределах частных сетевых сред клиентов. Некоторые поля с конфиденциальными данными, на которые таким образом ссылается пользовательский код приложения, могут отсутствовать. При необходимости в extendedProperties используйте SecureString, а не ссылку на связанную службу.

Передача выходных данных в другое действие

Пользовательские значения из кода пользовательского действия можно отправлять обратно в службу. Для этого ваше приложение должно записывать их в файл outputs.json. Служба копирует содержимое файла outputs.json и добавляет его в выходные данные действия в качестве значения свойства customOutput. (Предел размера 2 МБ.) Если вы хотите использовать содержимое файла outputs.json в последующих действиях, значение можно получить с помощью выражения @activity('<MyCustomActivity>').output.customOutput.

Получение выходных данные SecureString

Значения свойств, требующих особого обращения, обозначенные как тип SecureString, как показано в некоторых примерах в этой статье, скрыты на вкладке "Мониторинг" в пользовательском интерфейсе. Однако при фактическом выполнении конвейера свойство SecureString сериализуется как JSON в файле activity.json в виде обычного текста. Например:

"extendedProperties": {
  "connectionString": {
    "type": "SecureString",
    "value": "aSampleSecureString"
  }
}

Сериализация на самом деле небезопасна и не предназначена для обеспечения безопасности. Цель состоит в том, чтобы указать службе на то, что необходимо скрыть это значение на вкладке "Мониторинг".

Чтобы получить доступ из пользовательских действий к свойствам типа SecureString, прочитайте файл activity.json, который помещен в ту же папку, что и ваш EXE-файл, десериализуйте JSON, а затем получите доступ к свойству JSON (extendedProperties => [propertyName] => value).

Автомасштабирование пакетной службы Azure

Можно также создать пул пакетной службы Azure с использованием функции автомасштабирования . Например, можно создать пул пакетной службы Azure с нулем выделенных виртуальных машин и формулой автоматического масштабирования на основе числа ожидающих задач.

Приведенный здесь пример формулы обеспечивает следующее поведение: при создании пула он изначально содержит одну виртуальную машину. Метрика $PendingTasks определяет количество задач в состоянии выполнения и активном состоянии (в очереди). Формула находит среднее число ожидающих выполнения задач за последние 180 секунд и соответствующим образом задает значение TargetDedicated. Благодаря этому значение TargetDedicated никогда не превысит 25 виртуальных машин. Таким образом, по мере поступления новых задач пул автоматически увеличивается, а по мере их завершения виртуальные машины высвобождаются по одной, и функция автоматического масштабирования уменьшает пул. Значения startingNumberOfVMs и maxNumberofVMs можно настроить в соответствии со своими потребностями.

Формула автоматического масштабирования:

startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);

Дополнительные сведения см. в статье Автоматическое масштабирование вычислительных узлов в пуле пакетной службы Azure.

Если в пуле используется autoScaleEvaluationInterval(значение по умолчанию), пакетной службе может потребоваться 15–30 минут на подготовку виртуальной машины перед выполнением настраиваемого действия. Если пул использует другое значение autoScaleEvaluationInterval, пакетная служба может затрачивать autoScaleEvaluationInterval плюс 10 минут.

Ознакомьтесь со следующими ссылками, в которых описаны способы преобразования данных другими способами: