Скрипт потока данных (DFS)
ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics
Совет
Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !
Потоки данных доступны в конвейерах как Фабрики данных Azure, так и Azure Synapse. Эта статья относится к потокам данных для сопоставления. Если вы не знакомы с преобразованиями, см. вводную статью Преобразование данных с помощью потока данных для сопоставления.
Скрипт потока данных (DFS) представляет собой базовые метаданные, аналогичные языку программирования. Он используется для выполнения преобразований, включенных в поток данных для сопоставления. Каждое преобразование представлено рядом свойств, которые предоставляют необходимые сведения для правильного выполнения задания. Чтобы увидеть скрипт в Фабрике данных Azure и получить к нему доступ для редактирования, нажмите кнопку "Скрипт" на верхней ленте пользовательского интерфейса браузера.
Например, allowSchemaDrift: true,
в преобразовании источника указывает службе включать все столбцы из исходного набора данных в поток данных, даже если они не включены в проекцию схемы.
Случаи использования
DFS автоматически создается интерфейсом пользователя. Кнопка “Скрипт” позволяет просмотреть и настроить скрипт. Кроме того, можно создавать скрипты за пределами пользовательского интерфейса ADF, а затем передавать их в командлет PowerShell. При отладке сложных потоков данных может оказаться проще проверять код программной части скрипта, а не представление графа пользовательского интерфейса для потоков.
Ниже приведено несколько примеров использования.
- Программное создание множества похожих потоков данных (“штамповка” потоков данных).
- Сложные выражения, которыми трудно управлять в пользовательском интерфейсе или которые могут приводить к проблемам проверки.
- Отладка и более эффективное понимание различных ошибок, возвращаемых во время выполнения.
При создании скрипта потока данных для использования с PowerShell или API необходимо свернуть форматированный текст в одну строку. Символы табуляции и новой строки можно сохранить в виде escape-символов. Но текст должен быть отформатирован в соответствии со свойством JSON. В нижней части пользовательского интерфейса редактора скриптов есть кнопка, которая будет форматировать скрипт как одну строку.
Добавление преобразований
Чтобы добавить преобразования, нужно выполнить три основных шага: добавить основные данные преобразования, перенаправить входной поток и перенаправить выходной поток. Проще всего это увидеть на примере. Начнем с простого источника для потока данных приемника, как в следующем примере:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Если мы решим добавить производное преобразование, сначала необходимо создать основной текст преобразования с простым выражением, чтобы добавить новый столбец в верхнем регистре под названием upperCaseTitle
:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Затем мы берем существующий DFS и добавляем преобразование:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Теперь перенаправляем входящий поток, определив преобразование, за которым должно следовать новое преобразование (в данном случае source1
), и скопировав имя потока в новое преобразование:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Наконец, мы определяем преобразование, которое должно следовать за новым преобразованием, и заменяем его входной поток (в данном случае sink1
) именем выходного потока нашего нового преобразования:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Основы DFS
DFS состоит из ряда связанных преобразований, включая источники, приемники и различные другие элементы, которые могут добавлять новые столбцы, фильтровать и объединять данные и многое другое. Обычно скрипт начинается с одного или нескольких источников, за которым следует много преобразований, и заканчивается одним или несколькими приемниками.
Все источники имеют одинаковую базовую конструкцию:
source(
source properties
) ~> source_name
Например, простой источник с тремя столбцами (movieId, title, genres) будет выглядеть следующим образом:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
Все преобразования, отличные от источников, имеют одну и ту же базовую конструкцию:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Например, простое производное преобразование, которое принимает столбец (title) и перезаписывает его версией в верхнем регистре, будет выглядеть следующим образом:
source1 derive(
title = upper(title)
) ~> derive1
А приемник без схемы будет выглядеть так:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Фрагменты скриптов
Фрагменты скриптов — это доступный для общего использования код скрипта потока данных, который можно предоставлять в потоках данных. В приведенном ниже видео рассказывается о том, как использовать фрагменты скриптов и как копировать и вставлять части скрипта, определяющие графы потока данных, с помощью скрипта потока данных.
Агрегатная сводная статистика
Добавьте преобразование “Статистическая обработка” в поток данных с именем SummaryStats, а затем вставьте в скрипт приведенный ниже код для агрегатной функции, заменив существующий код SummaryStats. Это обеспечит общий шаблон для сводной статистики профиля данных.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
Приведенный ниже пример также можно использовать для подсчета числа уникальных и различающихся строк в данных. Приведенный ниже пример можно вставить в поток данных с преобразованием “Статистическая обработка” с именем ValueDistAgg. В этом примере используется столбец с именем title. Обязательно замените title строковым столбцом в данных, которые вы хотите использовать для получения счетчиков значений.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Включение всех столбцов в статистическое выражение
Этот универсальный шаблон статистического выражения показывает, как сохранить оставшиеся столбцы в выходных метаданных при построении статистических выражений. В этом случае мы используем функцию first()
, чтобы выбрать первое значение в каждом столбце с именем, отличным от movie. Чтобы использовать эту функцию, создайте преобразование “Статистическая обработка” с именем DistinctRows и вставьте его в скрипт поверх существующего сценария статистического выражения DistinctRows.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Создание отпечатка хэша строки
Используйте этот код в скрипте потока данных, чтобы создать новый производный столбец с именем DWhash
, который создает хэш sha1
трех столбцов.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
Этот скрипт также позволяет создать хэш строки, используя все столбцы, имеющиеся в потоке, при этом не требуется присваивать имя каждому столбцу.
derive(DWhash = sha1(columns())) ~> DWHash
Эквивалент String_agg
Этот код будет действовать как функция string_agg()
T-SQL и будет объединять строковые значения в массив. Затем этот массив можно привести к строке для использования с назначениями SQL.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Число обновлений, операций upsert, вставок, удалений
При использовании преобразования “Изменение строк” может потребоваться подсчитать количество обновлений, операций upsert, вставок и удалений, выполненных в результате применения политик изменения строк. Добавьте преобразование “Статистическая обработка” после преобразования “Изменение строк” и вставьте этот скрипт потока данных в определение статистического выражения для этих счетчиков.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Отличающаяся строка с учетом всех столбцов
Этот фрагмент кода добавит в поток данных новое преобразование "Статистическая обработка", которое будет принимать все входящие столбцы, формировать хэш, используемый для группировки, чтобы исключить дубликаты, а затем предоставлять первое вхождение каждого дубликата в качестве выходных данных. Столбцам не нужно явно присваивать имена, они будут автоматически создаваться на основе входящего потока данных.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
Проверка наличия значений NULL во всех столбцах
Этот фрагмент кода можно вставить в поток данных для общей проверки всех столбцов на наличие значений NULL. Этот метод использует смещение схемы, чтобы просмотреть все столбцы во всех строках и с помощью Условного разбиения отделить строки со значениями NULL от остальных строк.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
Смещение схемы автоматического сопоставления с помощью инструкции select
Если необходимо загрузить существующую схему базы данных из неизвестного или динамического набора входящих столбцов, необходимо сопоставить крайние правые столбцы в преобразовании приемника. Это необходимо только при загрузке существующей таблицы. Добавьте этот фрагмент кода перед приемником, чтобы создать инструкцию Select, которая будет автоматически сопоставлять столбцы. Оставьте сопоставление приемника для автоматического сопоставления.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Сохранение типов данных столбцов
Добавьте этот скрипт в определение производного столбца, чтобы сохранить имена столбцов и типы данных из потока данных в постоянном хранилище с помощью приемника.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Заполнение по направлению вниз
Здесь показано, как реализовать распространенную задачу "заполнение вниз" с наборами данных, если требуется заменить значения NULL в последовательности предыдущим значением, отличным от NULL. Обратите внимание, что эта операция может отрицательно сказаться на производительности, поскольку необходимо создать искусственное окно во всем наборе данных с фиктивным значением категории. Кроме того, необходимо выполнить сортировку по значению, чтобы создать правильную последовательность данных для поиска предыдущего значения, отличного от NULL. Следующий фрагмент кода создает искусственную категорию как фиктивную и сортирует ее по суррогатному ключу. Вы можете удалить суррогатный ключ и использовать собственный ключ сортировки, относящийся к данным. В этом фрагменте кода предполагается, что вы уже добавили преобразование источника с именем source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Скользящее среднее
Скользящее среднее можно легко реализовать в потоках данных с помощью преобразования окон. В следующем примере создается 15-дневное скользящее среднее для цен на акции для Майкрософт.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Число различных значений столбцов
Этот скрипт можно использовать для определения ключевых столбцов и просмотра количества элементов для всех столбцов в потоке с помощью одного фрагмента сценария. Добавьте этот скрипт в поток данных как преобразование статистической обработки, и он будет автоматически выдавать количество разных значений для всех столбцов.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Сравнение значений предыдущей или следующей строки
Этот фрагмент кода демонстрирует, как преобразование "Окно" можно использовать для сравнения значений столбцов из текущего контекста строки со значениями столбцов из строк, расположенных до и после текущей строки. В этом примере производный столбец используется для создания фиктивного значения, чтобы охватить секцию окна из всего набора данных. Преобразование "Суррогатный ключ" используется для назначения уникального значения ключа каждой строке. При применении этого шаблона к преобразованиям данных можно удалить суррогатный ключ, если нужно использовать упорядочение по имеющемуся столбцу, а также удалить производный столбец, если у вас есть столбцы, по которым можно выполнить секционирование данных.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Сколько столбцов в моих данных?
size(array(columns()))
Связанный контент
Проанализируйте потоки данных. Для начала ознакомьтесь со статьей Общие сведения о потоках данных