Краткое руководство. Создание фабрики данных и конвейера с помощью пакета SDK .NET
ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics
Совет
Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !
В этом кратком руководстве показано, как создать фабрику данных Azure с помощью пакета SDK для .NET. Конвейер, который вы создадите в этой фабрике данных, копирует данные из одной папки в другую в хранилище BLOB-объектов Azure. Инструкции по преобразованию данных с помощью фабрики данных Azure см. в руководстве по преобразованию данных с помощью Spark.
Примечание.
Эта статья не содержит подробный обзор службы фабрики данных. Общие сведения о службе фабрики данных Azure см. в статье Введение в фабрику данных Azure.
Предварительные требования
Подписка Azure.
Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
Роли в Azure
Чтобы создать экземпляры фабрики данных, нужно назначить учетной записи пользователя, используемой для входа в Azure, роль участника, владельца либо администратора подписки Azure. Чтобы просмотреть имеющиеся разрешения в подписке на портале Azure, выберите имя пользователя в правом верхнем углу и щелкните значок ... для выбора дополнительных параметров, а затем выберите Мои разрешения. Если у вас есть доступ к нескольким подпискам, выберите соответствующую подписку.
Чтобы создавать дочерние ресурсы для службы "Фабрика данных", в том числе наборы данных, связанные службы, конвейеры, триггеры и среды выполнения интеграции, а также управлять ими, выполните следующие требования:
- Чтобы создавать дочерние ресурсы и управлять ими на портале Azure, необходимо иметь роль Участник Фабрики данных на уровне группы ресурсов или более высоком.
- Чтобы создавать дочерние ресурсы и управлять ими с помощью PowerShell или пакета SDK, достаточно роли Участник на уровне ресурса или более высоком.
Примеры инструкций по назначению пользователю роли см. в статье Добавление или изменение администраторов подписки Azure.
Дополнительные сведения см. в следующих статьях:
Учетная запись хранения Azure
В этом кратком руководстве в качестве исходного и целевого хранилища данных используется учетная запись хранения Azure общего назначения (в частности, хранилища BLOB-объектов). Если у вас нет учетной записи хранения Azure общего назначения, см. инструкции по ее созданию.
Получение имени учетной записи хранения
Для выполнения инструкций этого краткого руководства вам потребуется имя учетной записи хранения Azure. Далее описана процедура получения имени учетной записи хранения.
- В веб-браузере перейдите к Портал Azure и выполните вход с помощью имени пользователя и пароля Azure.
- В меню портала Azure выберите Все службы, а затем выберите Хранилище>Учетные записи хранения. Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
- На странице Учетные записи хранения найдите с помощью фильтра свою учетную запись хранения (при необходимости), а затем выберите эту учетную запись.
Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
Создание контейнера больших двоичных объектов
В этом разделе вы создадите контейнер больших двоичных объектов с именем adftutorial в хранилище BLOB-объектов Azure.
На странице учетной записи хранения выберите Общие сведения>Контейнеры.
На панели инструментов страницы <Имя учетной записи> - Контейнеры выберите Контейнер.
В диалоговом окне Создание контейнера введите adftutorial в качестве имени и щелкните ОК. Страница <Имя учетной записи> - Контейнеры будет обновлена, и в списке появится контейнер adftutorial.
Добавление входной папки и файла для контейнера BLOB-объектов
В этом разделе показано, как создать папку с именем input в созданном вами контейнере и отправить пример файла в эту папку. Прежде чем начать, откройте текстовый редактор, например Блокнот и создайте файл emp.txt с таким содержимым:
John, Doe
Jane, Doe
Сохраните файл в папкеC:\ADFv2QuickStartPSH. (Если папка еще не существует, создайте ее.) Затем вернитесь к портал Azure и выполните следующие действия:
На странице <Имя учетной записи> - Контейнеры, где вы остановились, в обновленном списке контейнеров выберите adftutorial.
- Если вы закрыли окно или перешли на другую страницу, снова войдите на портал Azure.
- В меню портала Azure выберите Все службы, а затем выберите Хранилище>Учетные записи хранения. Можно также выполнить поиск на любой странице и выбрать Учетные записи хранения.
- Выберите свою учетную запись хранения, а затем выберите Контейнеры>adftutorial.
На панели инструментов adftutorial страницы контейнера выберите Отправка.
На странице Отправка BLOB-объектов выберите поле Файлы, а затем найдите и выберите файл emp.txt.
Разверните заголовок Дополнительно. Теперь страница отображается, как показано ниже:
В поле Отправить в папку введите input.
Нажмите кнопку Отправить. В списке должен отобразиться файл emp.txt с состоянием отправки.
Щелкните Закрыть значок (X), чтобы закрыть страницу Отправка BLOB-объектов.
Не закрывайте страницу контейнера adftutorial. Она понадобится для проверки выходных данных в конце этого руководства.
Visual Studio
В этом пошаговом руководстве используется Visual Studio 2019. Процедуры в Visual Studio 2013, 2015 или 2017 незначительно отличаются.
Создание приложения в идентификаторе Microsoft Entra
Из разделов, описанных в разделе "Практическое руководство. Создание приложения Microsoft Entra и субъекта-службы, которое может получить доступ к ресурсам", следуйте инструкциям, чтобы выполнить следующие задачи:
- В разделе "Создание приложения Microsoft Entra" создайте приложение, представляющее приложение .NET, которое вы создаете в этом руководстве. В качестве URL-адреса входа можно указать фиктивный URL-адрес, как показано в статье (
https://contoso.org/exampleapp
). - В разделе Получение значений для входа получите код приложения, а также идентификатор арендатора и запишите эти значения. Они потребуются в дальнейшем при выполнении инструкций этого руководства.
- В разделе Сертификаты и секреты получите ключ аутентификации и запишите это значение. Оно потребуется в дальнейшем при выполнении инструкций этого руководства.
- В разделе Назначение приложению роли назначьте приложению роль Участник на уровне подписки, чтобы приложение могло создавать фабрики данных в подписке.
Создание проекта Visual Studio
Далее создайте в Visual Studio консольное приложение C# .NET.
- Запустите Visual Studio.
- В окне запуска выберите Создать новый проект>Консольное приложение (платформа .NET Framework). Требуется .NET версии 4.5.2 или более поздней.
- В окне Имя проекта введите ADFv2QuickStart.
- Выберите Создать, чтобы создать проект.
Установка пакетов Nuget
Выберите Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.
На панели Консоль диспетчера пакетов выполните следующие команды, чтобы установить пакеты. Дополнительные сведения см. в пакете NuGet Azure.ResourceManager.DataFactory .
Install-Package Azure.ResourceManager.DataFactory -IncludePrerelease Install-Package Azure.Identity
Создание фабрики данных
Откройте файл Program.cs и включите следующие инструкции, чтобы добавить ссылки на пространства имен.
using Azure; using Azure.Core; using Azure.Core.Expressions.DataFactory; using Azure.Identity; using Azure.ResourceManager; using Azure.ResourceManager.DataFactory; using Azure.ResourceManager.DataFactory.Models; using Azure.ResourceManager.Resources; using System; using System.Collections.Generic;
Добавьте следующий код в метод Main, который задает следующие переменные. Замените значения заполнителей на собственные. Чтобы получить список регионов Azure, в которых в настоящее время доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища Azure, база данных SQL Azure и многие другие) и вычисления (HDInsight и другие), используемые фабрикой данных, могут располагаться в других регионах.
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID where the data factory resides>"; string resourceGroup = "<your resource group where the data factory resides>"; string region = "<the location of your resource group>"; string dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>"; string storageAccountName = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; // specify the container and input folder from which all files // need to be copied to the output folder. string inputBlobContainer = "<blob container to copy data from, e.g. containername>"; string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. inputdir/file>"; //specify the contains and output folder where the files are copied string outputBlobContainer = "<blob container to copy data from, e.g. containername>"; string outputBlobPath = "<the blob path to copy data to, e.g. outputdir/file>"; // name of the Azure Storage linked service, blob dataset, and the pipeline string storageLinkedServiceName = "AzureStorageLinkedService"; string blobDatasetName = "BlobDataset"; string pipelineName = "Adfv2QuickStartPipeline";
Добавьте следующий код, создающий фабрику данных, в метод Main.
ArmClient armClient = new ArmClient( new ClientSecretCredential(tenantID, applicationId, authenticationKey, new TokenCredentialOptions { AuthorityHost = AzureAuthorityHosts.AzurePublicCloud }), subscriptionId, new ArmClientOptions { Environment = ArmEnvironment.AzurePublicCloud } ); ResourceIdentifier resourceIdentifier = SubscriptionResource.CreateResourceIdentifier(subscriptionId); SubscriptionResource subscriptionResource = armClient.GetSubscriptionResource(resourceIdentifier); Console.WriteLine("Get an existing resource group " + resourceGroupName + "..."); var resourceGroupOperation = subscriptionResource.GetResourceGroups().Get(resourceGroupName); ResourceGroupResource resourceGroupResource = resourceGroupOperation.Value; Console.WriteLine("Create a data factory " + dataFactoryName + "..."); DataFactoryData dataFactoryData = new DataFactoryData(AzureLocation.EastUS2); var dataFactoryOperation = resourceGroupResource.GetDataFactories().CreateOrUpdate(WaitUntil.Completed, dataFactoryName, dataFactoryData); Console.WriteLine(dataFactoryOperation.WaitForCompletionResponse().Content); // Get the data factory resource DataFactoryResource dataFactoryResource = dataFactoryOperation.Value;
Создание связанной службы
Добавьте следующий код, создающий связанную службу хранилища Azure, в метод Main.
Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В этом кратком руководстве необходимо создать только одну Хранилище BLOB-объектов Azure связанную службу как для источника копирования, так и для хранилища приемников. В примере она называется AzureBlobStorageLinkedService.
// Create an Azure Storage linked service
Console.WriteLine("Create a linked service " + storageLinkedServiceName + "...");
AzureBlobStorageLinkedService azureBlobStorage = new AzureBlobStorageLinkedService()
{
ConnectionString = azureBlobStorageConnectionString
};
DataFactoryLinkedServiceData linkedServiceData = new DataFactoryLinkedServiceData(azureBlobStorage);
var linkedServiceOperation = dataFactoryResource.GetDataFactoryLinkedServices().CreateOrUpdate(WaitUntil.Completed, storageLinkedServiceName, linkedServiceData);
Console.WriteLine(linkedServiceOperation.WaitForCompletionResponse().Content);
Создание набора данных
Добавьте следующий код в метод Main , который создает набор данных с разделителями.
Вы можете определить набор данных, который будет представлять данные для копирования из источника в приемник. В этом примере этот набор данных с разделителями ссылается на Хранилище BLOB-объектов Azure связанную службу, созданную на предыдущем шаге. Набор данных принимает два параметра, значение которого задано в действии, использующее набор данных. Параметры используются для создания контейнера и папки FolderPath, указывающей на место хранения или хранения данных.
// Create an Azure Blob dataset
DataFactoryLinkedServiceReference linkedServiceReference = new DataFactoryLinkedServiceReference(DataFactoryLinkedServiceReferenceType.LinkedServiceReference, storageLinkedServiceName);
DelimitedTextDataset delimitedTextDataset = new DelimitedTextDataset(linkedServiceReference)
{
DataLocation = new AzureBlobStorageLocation
{
Container = DataFactoryElement<string>.FromExpression("@dataset().container"),
FileName = DataFactoryElement<string>.FromExpression("@dataset().path")
},
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("container",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("path",new EntityParameterSpecification(EntityParameterType.String))
},
FirstRowAsHeader = false,
QuoteChar = "\"",
EscapeChar = "\\",
ColumnDelimiter = ","
};
DataFactoryDatasetData datasetData = new DataFactoryDatasetData(delimitedTextDataset);
var datasetOperation = dataFactoryResource.GetDataFactoryDatasets().CreateOrUpdate(WaitUntil.Completed, blobDatasetName, datasetData);
Console.WriteLine(datasetOperation.WaitForCompletionResponse().Content);
Создание конвейера
Добавьте в метод Main следующий код, создающий конвейер с действием копирования.
В этом примере этот конвейер содержит одно действие и принимает четыре параметра: входной контейнер BLOB-объектов и путь, а также выходной контейнер и путь к большому двоичному объекту. Значения для этих параметров устанавливаются при активации или выполнении конвейера. Действие копирования ссылается на тот же набор данных большого двоичного объекта, который был создан на предыдущем шаге, в качестве входного и выходного. Если набор данных используется в качестве входного набора данных, указывается входной контейнер и путь. При использовании набора данных в качестве выходного набора данных указывается выходной контейнер и путь.
// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
DataFactoryPipelineData pipelineData = new DataFactoryPipelineData()
{
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("inputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("inputPath",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputPath",new EntityParameterSpecification(EntityParameterType.String))
},
Activities =
{
new CopyActivity("CopyFromBlobToBlob",new DataFactoryBlobSource(),new DataFactoryBlobSink())
{
Inputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.inputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.inputPath\""))
}
}
},
Outputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.outputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.outputPath\""))
}
}
}
}
}
};
var pipelineOperation = dataFactoryResource.GetDataFactoryPipelines().CreateOrUpdate(WaitUntil.Completed, pipelineName, pipelineData);
Console.WriteLine(pipelineOperation.WaitForCompletionResponse().Content);
Создание конвейера
Добавьте в метод Main следующий код, активирующий выполнение конвейера.
Этот код также задает значения параметров inputContainer, inputPath, outputContainer и outputPath, указанных в конвейере, с фактическими значениями пути к исходному и приемнику BLOB-объектов.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, BinaryData> parameters = new Dictionary<string, BinaryData>()
{
{ "inputContainer",BinaryData.FromObjectAsJson(inputBlobContainer) },
{ "inputPath",BinaryData.FromObjectAsJson(inputBlobPath) },
{ "outputContainer",BinaryData.FromObjectAsJson(outputBlobContainer) },
{ "outputPath",BinaryData.FromObjectAsJson(outputBlobPath) }
};
var pipelineResource = dataFactoryResource.GetDataFactoryPipeline(pipelineName);
var runResponse = pipelineResource.Value.CreateRun(parameters);
Console.WriteLine("Pipeline run ID: " + runResponse.Value.RunId);
Мониторинг выполнения конвейера
Добавьте следующий код в метод Main, чтобы постоянно проверять состояние до завершения копирования данных.
// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); DataFactoryPipelineRunInfo pipelineRun; while (true) { pipelineRun = dataFactoryResource.GetPipelineRun(runResponse.Value.RunId.ToString()); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued") System.Threading.Thread.Sleep(15000); else break; }
Добавьте в метод Main следующий код, извлекающий сведения о выполнении действия копирования, например размер записанных и прочитанных данных.
// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); var queryResponse = dataFactoryResource.GetActivityRun(pipelineRun.RunId.ToString(), new RunFilterContent(DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10))); var enumerator = queryResponse.GetEnumerator(); enumerator.MoveNext(); if (pipelineRun.Status == "Succeeded") Console.WriteLine(enumerator.Current.Output); else Console.WriteLine(enumerator.Current.Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Выполнение кода
Создайте и запустите приложение, а затем проверьте выполнение конвейера.
Консоль выведет ход выполнения создания фабрики данных, связанной службы, наборов данных, конвейера и выполнения конвейера. Затем она проверяет состояние выполнения конвейера. Дождитесь появления сведений о действии копирования с размером данных для чтения или записи. Затем воспользуйтесь такими средствами, как Обозреватель службы хранилища Azure, чтобы проверить, скопированы ли большие двоичные объекты в outputBlobPath из inputBlobPath, как указано в переменных.
Пример полученных результатов
Create a data factory quickstart-adf...
{
"name": "quickstart-adf",
"type": "Microsoft.DataFactory/factories",
"properties": {
"provisioningState": "Succeeded",
"version": "2018-06-01"
},
"location": "eastus2"
}
Create a linked service AzureBlobStorage...
{
"name": "AzureBlobStorage",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "AzureBlobStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;",
"encryptedCredential": "<encryptedCredential>"
}
}
}
Creating dataset BlobDelimitedDataset...
{
"name": "BlobDelimitedDataset",
"type": "Microsoft.DataFactory/factories/datasets",
"properties": {
"type": "DelimitedText",
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureBlobStorage"
},
"parameters": {
"container": {
"type": "String"
},
"path": {
"type": "String"
}
},
"typeProperties": {
"location": {
"container": {
"type": "Expression",
"value": "@dataset().container"
},
"type": "AzureBlobStorageLocation",
"fileName": {
"type": "Expression",
"value": "@dataset().path"
}
},
"columnDelimiter": ",",
"quoteChar": "\"",
"escapeChar": "\\",
"firstRowAsHeader": false
}
}
}
Creating pipeline Adfv2QuickStartPipeline...
{
"properties": {
"activities": [
{
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.inputContainer",
"path": "@pipeline().parameters.inputPath"
}
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.outputContainer",
"path": "@pipeline().parameters.outputPath"
}
}
],
"name": "CopyFromBlobToBlob",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
}
}
],
"parameters": {
"inputContainer": {
"type": "String"
},
"inputPath": {
"type": "String"
},
"outputContainer": {
"type": "String"
},
"outputPath": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 3aa26ffc-5bee-4db9-8bac-ccbc2d7b51c1
Checking pipeline run status...
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 1048,
"dataWritten": 1048,
"filesRead": 1,
"filesWritten": 1,
"sourcePeakConnections": 1,
"sinkPeakConnections": 1,
"copyDuration": 8,
"throughput": 1.048,
"errors": [],
"effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
"usedDataIntegrationUnits": 4,
"billingReference": {
"activityType": "DataMovement",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
],
"totalBillableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
]
},
"usedParallelCopies": 1,
"executionDetails": [
{
"source": {
"type": "AzureBlobStorage"
},
"sink": {
"type": "AzureBlobStorage"
},
"status": "Succeeded",
"start": "2023-12-15T10:25:33.9991558Z",
"duration": 8,
"usedDataIntegrationUnits": 4,
"usedParallelCopies": 1,
"profile": {
"queue": {
"status": "Completed",
"duration": 5
},
"transfer": {
"status": "Completed",
"duration": 1,
"details": {
"listingSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"readingFromSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"writingToSink": {
"type": "AzureBlobStorage",
"workingDuration": 0
}
}
}
},
"detailedDurations": {
"queuingDuration": 5,
"transferDuration": 1
}
}
],
"dataConsistencyVerification": {
"VerificationResult": "NotVerified"
}
}
Press any key to exit...
Проверка выходных данных
Конвейер автоматически создает выходную папку в контейнере больших двоичных объектов adftutorial. Затем он копирует файл emp.txt из входной папки в выходную.
- На портале Azure на странице контейнера adftutorial в разделе Добавление входной папки и файла для контейнера больших двоичных объектов, описанном выше, выберите Обновить, чтобы просмотреть выходную папку.
- В списке папок щелкните output в списке папок.
- Убедитесь, что файл emp.txt скопирован в папку output.
Очистка ресурсов
Чтобы программно удалить фабрики данных, добавьте следующие строки кода в программу:
Console.WriteLine("Deleting the data factory");
dataFactoryResource.Delete(WaitUntil.Completed);
Следующие шаги
В этом примере конвейер копирует данные из одного расположения в другое в хранилище BLOB-объектов Azure. Перейдите к руководствам, чтобы узнать об использовании фабрики данных в различных сценариях.