Modelli comuni di caricamento dei dati
Il caricatore automatico semplifica una serie di attività comuni di inserimento dati. Questo riferimento rapido fornisce esempi per diversi modelli comuni.
Filtro di directory o file usando modelli GLOB
I modelli Glob possono essere usati per filtrare directory e file quando specificati nel percorso.
Modello | Descrizione |
---|---|
? |
Corrisponde a qualsiasi carattere singolo |
* |
Corrisponde a zero o più caratteri |
[abc] |
Trova la corrispondenza di un singolo carattere del set di caratteri {a,b,c}. |
[a-z] |
Trova la corrispondenza di un singolo carattere dall'intervallo di caratteri {a... z}. |
[^a] |
Trova la corrispondenza di un singolo carattere non incluso nel set di caratteri o nell'intervallo {a}. Si noti che il ^ carattere deve essere immediatamente a destra della parentesi aperta. |
{ab,cd} |
Trova la corrispondenza di una stringa dal set di stringhe {ab, cd}. |
{ab,c{de, fh}} |
Trova la corrispondenza di una stringa dal set di stringhe {ab, cde, cfh}. |
path
Usare per fornire modelli di prefisso, ad esempio:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Importante
È necessario usare l'opzione pathGlobFilter
per fornire in modo esplicito modelli di suffisso. Fornisce path
solo un filtro di prefisso.
Ad esempio, se si desidera analizzare solo png
i file in una directory contenente file con suffissi diversi, è possibile eseguire le operazioni seguenti:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Nota
Il comportamento predefinito del caricatore automatico è diverso dal comportamento predefinito di altre origini file Spark. Aggiungere .option("cloudFiles.useStrictGlobber", "true")
alla lettura per usare il globbing che corrisponde al comportamento predefinito di Spark rispetto alle origini file. Per altre informazioni sul globbing, vedere la tabella seguente:
Modello | Percorso file | Globber predefinito | Globber rigoroso |
---|---|---|---|
/a/b | /a/b/c/file.txt | Sì | Sì |
/a/b | /a/b_dir/c/file.txt | No | No |
/a/b | /a/b.txt | No | No |
/a/b/ | /a/b.txt | No | No |
/a/*/c/ | /a/b/c/file.txt | Sì | Sì |
/a/*/c/ | /a/b/c/d/file.txt | Sì | Sì |
/a/*/c/ | /a/b/x/y/c/file.txt | Sì | No |
/a/*/c | /a/b/c_file.txt | Sì | No |
/a/*/c/ | /a/b/c_file.txt | Sì | No |
/a/*/c/ | /a/*/cookie/file.txt | Sì | No |
/a/b* | /a/b.txt | Sì | Sì |
/a/b* | /a/b/file.txt | Sì | Sì |
/a/{0.txt,1.txt} | /a/0.txt | Sì | Sì |
/a/*/{0.txt,1.txt} | /a/0.txt | No | No |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Sì | Sì |
Abilitare easy ETL
Un modo semplice per ottenere i dati in Delta Lake senza perdere dati consiste nell'usare il modello seguente e abilitare l'inferenza dello schema con il caricatore automatico. Databricks consiglia di eseguire il codice seguente in un processo di Azure Databricks per riavviare automaticamente il flusso quando lo schema dei dati di origine cambia. Per impostazione predefinita, lo schema viene dedotto come tipi stringa, tutti gli errori di analisi (non dovrebbero verificarsi se tutto rimane come stringa) andranno a _rescued_data
e qualsiasi nuova colonna non riuscirà il flusso ed evolverà lo schema.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Evitare la perdita di dati in dati ben strutturati
Quando si conosce lo schema, ma si vuole sapere ogni volta che si ricevono dati imprevisti, Databricks consiglia di usare .rescuedDataColumn
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Se si vuole che il flusso interrompa l'elaborazione se viene introdotto un nuovo campo che non corrisponde allo schema, è possibile aggiungere:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Abilitare pipeline di dati semistrutturate flessibili
Quando si ricevono dati da un fornitore che introduce nuove colonne alle informazioni fornite, è possibile che non si sappia esattamente quando lo fanno o che non si dispone della larghezza di banda per aggiornare la pipeline di dati. È ora possibile sfruttare l'evoluzione dello schema per riavviare il flusso e consentire al caricatore automatico di aggiornare automaticamente lo schema dedotto. È anche possibile sfruttare schemaHints
per alcuni dei campi "senza schema" che il fornitore potrebbe fornire.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Trasformare i dati JSON annidati
Poiché il caricatore automatico deduce le colonne JSON di primo livello come stringhe, è possibile rimanere con oggetti JSON annidati che richiedono ulteriori trasformazioni. È possibile usare le API di accesso ai dati semistrutturate per trasformare ulteriormente il contenuto JSON complesso.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Dedurre i dati JSON annidati
Dopo aver annidato i dati, è possibile usare l'opzione cloudFiles.inferColumnTypes
per dedurre la struttura annidata dei dati e di altri tipi di colonna.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Caricare file CSV senza intestazioni
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Applicare uno schema ai file CSV con intestazioni
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Inserire dati immagine o binari in Delta Lake per ML
Dopo aver archiviato i dati in Delta Lake, è possibile eseguire l'inferenza distribuita sui dati. Vedere Eseguire l'inferenza distribuita usando la funzione definita dall'utente pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Sintassi del caricatore automatico per DLT
Le tabelle live delta forniscono una sintassi Python leggermente modificata per Il caricatore automatico aggiunge il supporto SQL per il caricatore automatico.
Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la map()
funzione , è possibile passare le opzioni al cloud_files()
metodo . Le opzioni sono coppie chiave-valore, in cui le chiavi e i valori sono stringhe. Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
L'esempio seguente legge i dati dai file CSV delimitati da tabulazioni con un'intestazione:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
È possibile utilizzare schema
per specificare manualmente il formato. È necessario specificare per i formati che non supportano l'inferenza schema
dello schema:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Nota
Le tabelle live delta configurano e gestiscono automaticamente le directory dello schema e del checkpoint quando si usa il caricatore automatico per leggere i file. Tuttavia, se si configura manualmente una di queste directory, l'esecuzione di un aggiornamento completo non influisce sul contenuto delle directory configurate. Databricks consiglia di usare le directory configurate automaticamente per evitare effetti collaterali imprevisti durante l'elaborazione.