Čtení a zápis souborů XML
Důležité
Tato funkce je ve verzi Public Preview.
Tento článek popisuje, jak číst a zapisovat soubory XML.
Jazyk XML (Extensible Markup Language) je jazyk značek pro formátování, ukládání a sdílení dat v textovém formátu. Definuje sadu pravidel pro serializaci dat od dokumentů po libovolné datové struktury.
Nativní podpora formátu souboru XML umožňuje příjem dat, dotazování a analýzu dat XML pro dávkové zpracování nebo streamování. Může automaticky odvodit a vyvíjet schémata a datové typy, podporuje výrazy SQL jako from_xml
a může generovat dokumenty XML. Nevyžaduje externí soubory JAR a bezproblémově funguje s automatickým zavaděčem read_files
a COPY INTO
. Volitelně můžete ověřit každý záznam XML na úrovni řádků proti definici schématu XML (XSD).
Požadavky
Databricks Runtime 14.3 a novější
Analýza záznamů XML
Specifikace XML vyžaduje dobře formátovanou strukturu. Tato specifikace se ale okamžitě nemapuje na tabulkový formát. Je nutné zadat rowTag
možnost označit XML element, který mapuje na DataFrame
Row
. Prvek rowTag
se stane nejvyšší úrovní struct
. Podřízené prvky rowTag
se stanou poli nejvyšší úrovně struct
.
Můžete zadat schéma pro tento záznam nebo ho nechat automaticky odvodit. Vzhledem k tomu, že analyzátor zkoumá rowTag
pouze prvky, odfiltrují se DTD a externí entity.
Následující příklady ilustrují odvozování schématu a parsování souboru XML pomocí různých rowTag
možností:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Přečtěte si soubor XML s rowTag
možností "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Výstup:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Soubor XML rowTag
si můžete přečíst jako "knihu":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Výstup:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Možnosti zdroje dat
Možnosti zdroje dat pro XML lze zadat následujícími způsoby:
- Metody
.option/.options
následujících:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Následující předdefinované funkce:
- Klauzule
OPTIONS
CREATE TABLE USING DATA_SOURCE
Seznam možností najdete v tématu Možnosti automatického zavaděče.
Podpora XSD
Volitelně můžete ověřit každý záznam XML na úrovni řádků definicí schématu XML (XSD). V možnosti je zadaný rowValidationXSDPath
soubor XSD. XSD jinak nemá vliv na zadané schéma nebo odvozené. Záznam, který selže, je označený jako poškozený a zpracován na základě možnosti režimu zpracování poškozených záznamů popsaných v části možnosti.
Můžete použít XSDToSchema
k extrakci schématu datového rámce Spark ze souboru XSD. Podporuje pouze jednoduché, složité a sekvenční typy a podporuje pouze základní funkce XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Následující tabulka ukazuje převod datových typů XSD na datové typy Spark:
Datové typy XSD | Datové typy Sparku |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Parsování vnořeného XML
Data XML ve sloupci s řetězcovou hodnotou v existujícím datovém rámci je možné analyzovat schema_of_xml
a from_xml
která vrací schéma a analyzované výsledky jako nové struct
sloupce. Data XML předaná jako argument schema_of_xml
a from_xml
musí se jednat o jeden správně formátovaný záznam XML.
schema_of_xml
Syntaxe
schema_of_xml(xmlStr [, options] )
Argumenty
xmlStr
: Výraz STRING určující jeden správně formátovaný záznam XML.options
: VolitelnýMAP<STRING,STRING>
literál určující direktivy.
Vrácení
ŘETĚZEC obsahující definici struktury s n poli řetězců, ve kterých jsou názvy sloupců odvozeny od elementu XML a názvů atributů. Hodnoty polí obsahují odvozené formátované typy SQL.
from_xml
Syntaxe
from_xml(xmlStr, schema [, options])
Argumenty
xmlStr
: Výraz STRING určující jeden správně formátovaný záznam XML.schema
: Výraz STRING nebo vyvoláníschema_of_xml
funkce.options
: VolitelnýMAP<STRING,STRING>
literál určující direktivy.
Vrácení
Struktura s názvy polí a typy odpovídající definici schématu. Schéma musí být definováno jako názvy sloupců oddělených čárkami a páry datových typů, které se používají například CREATE TABLE
. Většina možností zobrazených v možnostech zdroje dat platí s následujícími výjimkami:
rowTag
: Protože existuje pouze jeden záznam XML,rowTag
možnost není použitelná.mode
(výchozí:PERMISSIVE
): Umožňuje režim zpracování poškozených záznamů během analýzy.PERMISSIVE
: Když splňuje poškozený záznam, umístí poškozený řetězec do pole nakonfigurovanéhocolumnNameOfCorruptRecord
pomocí a nastaví poškozená pole nanull
. Chcete-li zachovat poškozené záznamy, můžete nastavit pole typu řetězce pojmenovanécolumnNameOfCorruptRecord
v uživatelsky definovaném schématu. Pokud schéma pole neobsahuje, během analýzy zahodí poškozené záznamy. Při odvození schématu implicitně přidácolumnNameOfCorruptRecord
pole ve výstupním schématu.FAILFAST
: Vyvolá výjimku, když splňuje poškozené záznamy.
Převod struktury
Vzhledem k rozdílům ve struktuře mezi datovým rámcem DataFrame a XML existují určitá pravidla převodu z dat XML do DataFrame
a z DataFrame
dat XML. Všimněte si, že zpracování atributů lze zakázat pomocí možnosti excludeAttribute
.
Převod z XML na datový rámec
Atributy: Atributy jsou převedeny jako pole s předponou nadpisu attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
vytvoří následující schéma:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Znaková data v elementu, který obsahuje atributy nebo podřízené elementy: Tyto prvky jsou analyzovány do valueTag
pole. Pokud existuje více výskytů znakových dat, valueTag
pole se převede na array
typ.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
vytvoří následující schéma:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Převod z datového rámce na XML
Element jako pole v matici: Zápis souboru XML z DataFrame
pole ArrayType
s jeho elementem, stejně jako ArrayType
by měl další vnořené pole pro prvek. K tomu nedojde při čtení a zápisu dat XML, ale při zápisu DataFrame
čtení z jiných zdrojů. Proto zaokrouhlování při čtení a zápisu souborů XML má stejnou strukturu, ale zápis DataFrame
čtení z jiných zdrojů je možné mít jinou strukturu.
Datový rámec s následujícím schématem:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
a s daty níže:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
vytvoří soubor XML níže:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Název prvku nepojmenovaného pole v poli DataFrame
je určen možností arrayElementName
(Výchozí: item
).
Sloupec zachráněných dat
Sloupec zachráněných dat zajišťuje, že během ETL nikdy nepřijdete o data nebo je nezmeškáte. Můžete povolit, aby zachytávalo všechna data, která nebyla analyzována, protože jedno nebo více polí v záznamu má jeden z následujících problémů:
- Chybí ze zadaného schématu.
- Neodpovídá datovému typu zadaného schématu.
- Neshoda velkých a velkých písmen s názvy polí v zadaném schématu
Uložený datový sloupec se vrátí jako dokument JSON obsahující sloupce, které byly uloženy, a cestu ke zdrojovému souboru záznamu. Chcete-li odebrat cestu ke zdrojovému souboru ze sloupce zachráněných dat, můžete nastavit následující konfiguraci SQL:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Záchranný datový sloupec můžete povolit nastavením možnosti rescuedDataColumn
na název sloupce při čtení dat, například _rescued_data
pomocí spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Analyzátor XML podporuje při analýze záznamů tři režimy: PERMISSIVE
, DROPMALFORMED
a FAILFAST
. Při použití společně s datovým rescuedDataColumn
typem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED
režimu nebo vyvolání chyby v FAILFAST
režimu. Zahodí se pouze poškozené záznamy (neúplné nebo poškozené XML) nebo vyvolá chyby.
Odvození schématu a vývoj v automatickém zavaděče
Podrobnou diskuzi o tomto tématu a příslušných možnostech najdete v tématu Konfigurace odvozování schématu a vývoje v auto loaderu. Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat XML, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času.
Ve výchozím nastavení se při odvozování schématu automatického zavaděče snaží vyhnout problémům s vývojem schématu kvůli neshodám typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny sloupce jako řetězce, včetně vnořených polí v souborech XML. Apache Spark DataFrameReader
používá jiné chování pro odvozování schématu a výběr datových typů pro sloupce ve zdrojích XML na základě ukázkových dat. Chcete-li toto chování povolit pomocí automatického zavaděče, nastavte možnost cloudFiles.inferColumnTypes
na true
hodnotu .
Auto Loader zjistí přidání nových sloupců při zpracování dat. Když Auto Loader zjistí nový sloupec, datový proud se zastaví pomocí .UnknownFieldException
Než datový proud vyvolá tuto chybu, auto loader provede odvozování schématu v nejnovější mikrodávce dat a aktualizuje umístění schématu s nejnovějším schématem sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny. Auto Loader podporuje různé režimy pro vývoj schématu, který jste nastavili v možnosti cloudFiles.schemaEvolutionMode
.
Pomocí nápovědy schématu můžete vynutit informace o schématu, které znáte a očekáváte u odvozeného schématu. Pokud víte, že sloupec je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například dvojité místo celého čísla), můžete zadat libovolný počet tipů pro datové typy sloupců jako řetězec pomocí syntaxe specifikace schématu SQL. Pokud je povolený sloupec s daty o záchraně, načtou se do _rescued_data
sloupce pole pojmenovaná v jiném případě, než je schéma. Toto chování můžete změnit tak, že nastavíte možnost readerCaseSensitive
false
na možnost , v takovém případě Auto Loader čte data bez rozlišování malých a malých písmen.
Příklady
Příklady v této části používají soubor XML dostupný ke stažení v úložišti Apache Spark GitHub.
Čtení a zápis XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Při čtení dat můžete schéma zadat ručně:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Rozhraní API SQL
Zdroj dat XML může odvodit datové typy:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
V DDL můžete také zadat názvy a typy sloupců. V tomto případě není schéma odvozeno automaticky.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Načtení XML pomocí FUNKCE COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Čtení XML s ověřením řádků
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analýza vnořeného XML (from_xml a schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml a schema_of_xml s využitím rozhraní SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Načtení XML pomocí automatického zavaděče
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)