Распространенные шаблоны загрузки данных
Автозагрузчик упрощает ряд распространенных задач приема данных. В этом кратком справочнике приведены примеры нескольких популярных шаблонов.
Фильтрация каталогов или файлов с помощью стандартных масок
Стандартные маски можно использовать для фильтрации каталогов и файлов, если они указаны в пути.
Расписание | Description |
---|---|
? |
Соответствует любому одиночному знаку |
* |
Соответствует нескольким символам или их отсутствию |
[abc] |
Соответствует одиночному символу из кодировки {a, b, c}. |
[a-z] |
Соответствует одиночному символу из диапазона символов {a…z}. |
[^a] |
Соответствует одиночному символу, который не относится к кодировке или диапазону символов {a}. Обратите внимание, что символ ^ должен стоять непосредственно справа от открывающей скобки. |
{ab,cd} |
Соответствует строке из набора строк {ab, cd}. |
{ab,c{de, fh}} |
Соответствует строке из набора строк {ab, cde, cfh}. |
Используйте path
для предоставления шаблонов префиксов, например:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Внимание
Для явного предоставления шаблонов суффиксов необходимо использовать параметр pathGlobFilter
. path
предоставляет только фильтр префиксов.
Например, если вы хотите проанализировать только файлы png
в каталоге, содержащем файлы с разными суффиксами, можно выполнить указанные ниже команды.
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Примечание.
Поведение автозагрузчика по умолчанию отличается от поведения по умолчанию других источников файлов Spark. Добавьте .option("cloudFiles.useStrictGlobber", "true")
в чтение, чтобы использовать глоббинг, соответствующий по умолчанию поведению Spark в источниках файлов. Дополнительные сведения о глоббинге см. в следующей таблице:
Расписание | Путь к файлу | Globber по умолчанию | Строгий глоббер |
---|---|---|---|
/a/b | /a/b/c/file.txt | Да | Да |
/a/b | /a/b_dir/c/file.txt | Нет | Нет |
/a/b | /a/b.txt | Нет | Нет |
/a/b/ | /a/b.txt | Нет | Нет |
/a/*/c/ | /a/b/c/file.txt | Да | Да |
/a/*/c/ | /a/b/c/d/file.txt | Да | Да |
/a/*/c/ | /a/b/x/y/c/file.txt | Да | Нет |
/a/*/c | /a/b/c_file.txt | Да | Нет |
/a/*/c/ | /a/b/c_file.txt | Да | Нет |
/a/*/c/ | /a/*/cookie/file.txt | Да | Нет |
/a/b* | /a/b.txt | Да | Да |
/a/b* | /a/b/file.txt | Да | Да |
/a/{0.txt,1.txt} | /a/0.txt | Да | Да |
/a/*/{0.txt,1.txt} | /a/0.txt | Нет | Нет |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Да | Да |
Включение простого извлечения, преобразования и загрузки
Простой способ передать данные в Delta Lake без потери данных — использовать приведенный шаблон и включить вывод схемы с Автозагрузчиком. Databricks рекомендует запускать следующий код в задании Azure Databricks для автоматического перезапуска потока при изменении схемы исходных данных. По умолчанию схема выводится как строковые типы, любые ошибки синтаксического анализа (их не должно быть, если все данные останутся в виде строк) отправляются в _rescued_data
, а любые новые столбцы приведут к сбою потока и развитию схемы.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Предотвращение потери данных в хорошо структурированных данных
Если вы знакомы со схемой, но хотите знать, когда будете получать непредвиденные данные, Databricks рекомендует использовать rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Если вы хотите, чтобы поток прекратил обработку при добавлении нового поля, которое не соответствует схеме, можно добавить:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Обеспечение гибких конвейеров с частично структурированными данными
При получении данных от поставщика, который вводит новые столбцы в предоставляемые им сведения, вы можете не знать точно, когда он это делают, или у вас может не быть достаточной пропускной способности для обновления конвейера данных. Теперь вы можете использовать развитие схемы для перезапуска потока и автоматического обновления выводимой схемы Автозагрузчиком. Можно также использовать schemaHints
для некоторых полей "без схемы", которые может предоставить поставщик.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Преобразование вложенных данных JSON
Так как автозагрузчик выводит столбцы JSON верхнего уровня как строки, у вас могут остаться вложенные объекты JSON, требующие дальнейших преобразований. Вы можете использовать API доступа к частично структурированным данным для дальнейшего преобразования сложного содержимого JSON.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Вывод вложенных данных JSON
При наличии вложенных данных можно использовать параметр cloudFiles.inferColumnTypes
для вывода вложенной структуры данных и других типов столбцов.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Загрузка CSV-файлов без заголовков
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Применение схемы в CSV-файлах с заголовками
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Прием изображений или двоичных данных в Delta Lake для ML
После сохранения данных в Delta Lake можно выполнять распределенный вывод данных. Дополнительные сведения см. в разделе Выполнение распределенного вывода с помощью UDF pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Синтаксис автозагрузчика для DLT
Разностные динамические таблицы предоставляют немного измененный синтаксис Python для автозагрузчика, который добавляет поддержку SQL для автозагрузчика.
В следующих примерах для создания наборов данных из CSV- и JSON-файлов используется Автозагрузчик.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
С Автозагрузчиком можно использовать поддерживаемые параметры формата. map()
С помощью функции можно передать параметры методуcloud_files()
. Параметры — это пары "ключ-значение", где ключи и значения являются строками. Ниже описан синтаксис для работы с Автозагрузчиком в SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
В следующем примере показано, как из CSV-файлов с разделителями табуляции считываются данные с заголовком.
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Вы можете использовать schema
для указания формата вручную; вы должны указать schema
для форматов, которые не поддерживают вывод схемы.
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Примечание.
Разностные динамические таблицы автоматически настраивают каталоги схем и контрольных точек, а также управляют ими при использовании Автозагрузчика для чтения файлов. Однако если вы вручную настроите любой из этих каталогов, выполнение полного обновления не повлияет на содержимое настроенных каталогов. Для избежания непредвиденных побочных эффектов во время обработки, в Databricks рекомендуется использовать автоматически настроенные каталоги.