Преобразование данных с помощью потоков данных для сопоставления

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Если вы еще не работали с фабрикой данных Azure, ознакомьтесь со статьей Введение в фабрику данных Azure.

В этом руководстве вы будете использовать пользовательский интерфейс Фабрики данных Azure для создания конвейера, который будет копировать и преобразовывать данные из источника Azure Data Lake Storage (ADLS) 2-го поколения в приемник ADLS 2-го поколения с использованием функции сопоставления потоков данных. Шаблон конфигурации, приведенный в этом кратком руководстве, можно расширить при преобразовании данных с использованием функции сопоставления потоков данных

Примечание.

Данное руководство предназначено для использования сопоставления потоков данных в целом. Потоки данных доступны как в Фабрике данных Azure, так и в Synapse Pipelines. Если вы не знакомы с потоками данных в конвейерах Azure Synapse, следуйте указаниям в разделе Потоки данных с использованием Azure Synapse Pipelines.

Вот какие шаги выполняются в этом руководстве:

  • Создали фабрику данных.
  • Создайте конвейер с использованием действия Потока данных.
  • создание потока данных для сопоставления с четырьмя преобразованиями;
  • тестовый запуск конвейера;
  • Мониторинг выполнения Потока данных

Необходимые компоненты

  • Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
  • Учетная запись хранения Azure. Хранилище ADLS используется в качестве хранилища данных источника и приемника. Если у вас нет учетной записи хранения, создайте ее, следуя действиям в этом разделе.

Файл, который мы будем преобразовывать в этом руководстве (moviesDB.csv) можно найти здесь. Чтобы извлечь файл из GitHub, скопируйте его содержимое в любой текстовый редактор, а затем сохраните его на локальном компьютере в виде CSV-файла. Сведения о передаче файла в учетную запись хранения см. в статье Отправка BLOB-объектов с помощью портала Azure. В примерах будет использоваться контейнер под названием Sample-Data.

Создание фабрики данных

На этом этапе вы создадите фабрику данных и откроете пользовательский интерфейс службы "Фабрика данных" для создания конвейера в фабрике данных.

  1. Откройте Microsoft Edge или Google Chrome. Сейчас пользовательский интерфейс Фабрики данных поддерживают только браузеры Microsoft Edge и Google Chrome.

  2. В меню слева выберите Создать ресурс>Интеграция>Фабрика данных:

    Выбор фабрики данных в

  3. На странице Новая фабрика данных в поле Имя введите ADFTutorialDataFactory.

    Имя фабрики данных Azure должно быть глобально уникальным. Если вы увидите следующую ошибку касательно значения имени, введите другое имя фабрики данных. (Например, используйте yournameADFTutorialDataFactory.) Дополнительные сведения о правилах именования артефактов фабрики данных см. в статье Фабрика данных Azure — правила именования.

    Новое сообщение об ошибке фабрики данных со сведениями о том, что имя дублируется.

  4. Выберите подписку Azure, в рамках которой вы хотите создать фабрику данных.

  5. Для группы ресурсов выполните одно из следующих действий:

    a. Выберите Использовать существующуюи укажите существующую группу ресурсов в раскрывающемся списке.

    b. Выберите Создать новуюи укажите имя группы ресурсов.

    Сведения о группах ресурсов см. в статье Общие сведения об Azure Resource Manager.

  6. В качестве версии выберите V2.

  7. В поле Расположение выберите расположение фабрики данных. В раскрывающемся списке отображаются только поддерживаемые расположения. Хранилища данных (например, служба хранилища Azure и База данных SQL) и вычислительные ресурсы (например, Azure HDInsight), используемые фабрикой данных, могут располагаться в других регионах.

  8. Нажмите кнопку создания.

  9. После завершения создания вы увидите уведомление в центре уведомлений. Нажмите кнопку Перейти к ресурсу, чтобы открыть страницу фабрики данных.

  10. Выберите Создание и мониторинг, чтобы запустить на отдельной вкладке пользовательский интерфейс фабрики данных.

Создание конвейера с помощью действия "Поток данных"

На этом этапе вы создадите конвейер, содержащий действие "Поток данных".

  1. На домашней странице пользовательского интерфейса Фабрики данных выберите элемент Оркестрация.

    Снимок экрана: домашняя страница ADF

  2. На вкладке Общие для конвейера введите TransformMovies в поле Имя для конвейера.

  3. В области Действия разверните меню "гармошка" Move and Transform (Перемещение и преобразование). Перетащите действие Поток данных из области на холст конвейера.

    Снимок экрана: холст конвейера, на который можно перетащить действие Потока данных.

  4. Во всплывающем окне Добавление Потока данных выберите Создать Поток данных и присвойте ему имя TransformMovies. По завершении нажмите кнопку Готово .

    Снимок экрана: присваивание имени создаваемому потоку данных.

  5. На верхней панели холста конвейера продвиньте ползунок Отладка потока данных. Режим отладки позволяет в интерактивном режиме тестировать логику преобразования в динамическом кластере Spark. Подготовка кластеров Потоков данных занимает 5–7 минут, поэтому пользователям рекомендуем сначала включить отладку, если планируется разработка Потока данных. Дополнительные сведения см. в статье Режим отладки.

    Действие

Встраивание логики преобразования в холст потока данных

После создания Потока данных он будет автоматически отправлен на холст потока данных. Если вы не перенаправляетесь на холст потока данных, на панели под холстом перейдите в раздел "Параметры " и выберите "Открыть", расположенный рядом с полем потока данных. Откроется холст потока данных.

Снимок экрана: открытие редактора потока данных из редактора конвейера.

На этом шаге будет создан поток данных, который извлекает файл moviesDB.csv из хранилища ADLS и агрегирует среднюю оценку комедий с 1910 до 2000 года. Затем этот файл будет записан обратно в хранилище ADLS.

  1. На холсте потока данных добавьте источник, нажав кнопку Добавить источник.

    Снимок экрана с кнопкой

  2. Присвойте источнику имя MoviesDB. Щелкните Создать, чтобы создать новый набор данных источника.

    Снимок экрана, на котором показано, где нажата кнопка

  3. Выберите Azure Data Lake Storage 2-го поколения. Нажмите кнопку Продолжить.

    Снимок экрана: элемент

  4. Выберите DelimitedText. Нажмите кнопку Продолжить.

    Снимок экрана, где показан элемент DelimitedText.

  5. Присвойте набору данных имя MoviesDB. В раскрывающемся списке "Связанная служба" выберите Создать.

    Снимок экрана, на котором показан раскрывающийся список связанных служб.

  6. На экране создания связанной службы присвойте имя связанной службе ADLS 2-го поколения ADLSGen2 и укажите метод проверки подлинности. Затем введите учетные данные подключения. В этом руководстве для подключения к нашей учетной записи хранения используется ключ учетной записи. Можно нажать кнопку Проверить подключение, чтобы подтвердить правильность ввода учетных данных. После завершения нажмите Создать.

    Связанные службы

  7. Вернитесь на экран создания набора данных и в поле Путь к файлу введите расположение файла. В этом кратком руководстве файл moviesDB.csv находится в контейнере sample-data. Так как файл содержит заголовки, установите флажок Первая строка в качестве заголовка. Выберите Из подключения/хранилища, чтобы импортировать схему заголовка непосредственно из файла в хранилище. Когда закончите, нажмите кнопку ОК.

    Наборы данных

  8. Если кластер отладки запущен, перейдите на вкладку Предварительный просмотр данных преобразования источника и нажмите кнопку Обновить, чтобы получить моментальный снимок данных. Предварительный просмотр данных дает возможность убедиться, что преобразование настроено правильно.

    Снимок экрана, на котором показано, где можно просмотреть данные, чтобы убедиться, что преобразование настроено правильно.

  9. Щелкните значок "плюс" рядом с узлом источника на холсте потока данных, чтобы добавить новое преобразование. Первое добавляемое преобразование — Фильтр.

    Холст Потока данных

  10. Назовите преобразование фильтра FilterYears. Щелкните поле "Выражение" рядом с полем Фильтр, чтобы открыть построитель выражений. Здесь нужно указать условие фильтрации.

    Снимок экрана, на котором отображается поле

  11. Построитель выражений потока данных позволяет интерактивно создавать выражения для использования в различных преобразованиях. Выражения могут включать встроенные функции, столбцы из входной схемы и задаваемые пользователем параметры. Дополнительные сведения о построении выражений см. в статье Построитель выражений Потока данных.

    В этом кратком руководстве будут отфильтрованы фильмы в жанре комедия, которые вышли между 1910 и 2000 годами. В связи с тем, что в настоящее время год является строкой, ее необходимо преобразовать в целое число с помощью функции toInteger(). Используйте операторы "больше или равно" (>=) и "меньше или равно" (<=) для сравнения значений года с литералами 1910 и 2000. Объедините эти выражения с помощью оператора AND (&&). Выражение будет выглядеть следующим образом:

    toInteger(year) >= 1910 && toInteger(year) <= 2000

    Чтобы узнать, какие фильмы являются комедиями, можно использовать функцию rlike(), позволяющую найти слово "комедия" в жанрах столбца. Объедините выражение rlike с выражением сравнения года и получите следующее выражение:

    toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')

    Если кластер отладки активен, можно проверить логику, нажав кнопку Обновить, чтобы увидеть результат выражения в сравнении с входными данными. Существует несколько правильных ответов на вопрос, как можно реализовать эту логику с помощью языка выражений потока данных.

    Фильтр

    После завершения работы с выражением нажмите кнопку Сохранить и завершить.

  12. Нажмите Предварительный просмотр данных, чтобы убедиться, что фильтр работает правильно.

    Снимок экрана, на котором показан

  13. Следующее преобразование, которое необходимо добавить — это преобразование Агрегат в разделе Модификатор схемы.

    Снимок экрана, на котором показан модификатор схемы

  14. Назовите агрегатное преобразование AggregateComedyRatings. На вкладке Группировка выберите год из раскрывающегося списка, чтобы сгруппировать агрегаты по году, в котором вышел фильм.

    Снимок экрана, показывающий параметр year на вкладке

  15. Перейдите на вкладку Статистическая обработка. В левом текстовом поле присвойте столбцу имя AverageComedyRating. Щелкните правой кнопкой мыши поле выражения, чтобы ввести статистическое выражение с помощью построителя выражений.

    Снимок экрана, показывающий параметр year на вкладке

  16. Чтобы получить среднее значение столбца Оценка, используйте агрегатную функцию avg(). Так как оценка является строковым значением, а avg() принимает числовые входные данные, необходимо преобразовать значение в число с помощью функции toInteger(). Это выражение выглядит следующим образом:

    avg(toInteger(Rating))

    После завершения нажмите Сохранить и завершить.

    Снимок экрана, на котором показано сохраненное выражение.

  17. Откройте вкладку Предварительный просмотр данных, чтобы просмотреть выходные данные преобразования. Обратите внимание, что здесь есть только два столбца: year и AverageComedyRating.

    Агрегированное

  18. Затем необходимо добавить преобразование Приемник в качестве назначения.

    Снимок экрана, где добавляется преобразование

  19. Назовите приемник Sink. Нажмите Создать, чтобы создать набор данных приемника.

    Снимок экрана, на котором показано, как присвоить имя приемнику и создать новый набор данных приемника.

  20. Выберите Azure Data Lake Storage 2-го поколения. Нажмите кнопку Продолжить.

    Снимок экрана, показывающий элемент

  21. Выберите DelimitedText. Нажмите кнопку Продолжить.

    Набор данных

  22. Назовите набор данных приемника MoviesSink. В качестве связанной службы выберите связанную службу ADLS 2-го поколения, созданную на шаге 6. Введите выходную папку для записи данных. В этом кратком руководстве мы записываем данные в папку output в контейнере sample-data. Папка не обязательно должна существовать заранее и может быть создана динамически. Задайте для параметра Использовать первую строку в качестве заголовка значение "истина" и выберите значение Нет для параметра Импорт схемы. Нажмите кнопку «Готово».

    Приемник

Теперь создание потока данных завершено. Все готово для его запуска в конвейере.

Запуск и отслеживание Потока данных

Перед публикацией можно выполнить отладку конвейера. На этом шаге будет активирован запуск отладки конвейера Потока данных. При предварительном просмотре данных они не записываются, а в режиме отладки данные записываются в место назначения приемника.

  1. Перейдите на холст конвейера. Нажмите кнопку Отладка, чтобы запустить отладку.

    Снимок экрана, на котором показан холст конвейера с выделенным пунктом

  2. При отладке конвейера для действий Потока данных используется активный кластер отладки, но инициализация все равно занимает не менее минуты. Ход выполнения можно отслеживать на вкладке Выходные данные. После успешного выполнения щелкните значок очков, чтобы открыть область мониторинга.

    Pipeline

  3. В области мониторинга можно увидеть количество строк и время, затраченное на каждый шаг преобразования.

    Снимок экрана, показывающий область мониторинга, где можно увидеть количество строк и время, затраченное на каждый шаг преобразования.

  4. Щелкните преобразование, чтобы получить подробные сведения о столбцах и секционировании данных.

    Наблюдение

Если все действия в этом кратком руководстве выполнены правильно, то в папку приемника должны быть записаны 83 строки и 2 столбца. Данные можно проверить, проверив хранилище BLOB-объектов.

Конвейер в этом руководстве выполняет поток данных, который агрегирует средний рейтинг комедий с 1910 до 2000 годы и записывает данные в ADLS. Вы научились выполнять следующие задачи:

  • Создали фабрику данных.
  • Создайте конвейер с использованием действия Потока данных.
  • создание потока данных для сопоставления с четырьмя преобразованиями;
  • тестовый запуск конвейера;
  • Мониторинг выполнения Потока данных

Дополнительные сведения о языке выражений потока данных.