Erste Schritte mit COPY INTO zum Laden von Daten

Mit dem SQL-Befehl COPY INTO können Sie Daten aus einem Dateispeicherort in eine Delta-Tabelle laden. Hierbei handelt es sich um einen wiederholbaren und idempotenten Vorgang; Dateien am Quellspeicherort, die bereits geladen wurden, werden übersprungen.

COPY INTO bietet die folgenden Funktionen:

  • Einfache konfigurierbare Datei- oder Verzeichnisfilter aus dem Cloudspeicher, einschließlich S3, ADLS Gen2, ABFS, GCS und Unity Catalog-Volumes.
  • Unterstützung für mehrere Quelldateiformate: CSV, JSON, XML, Avro, ORC, Parquet, Text und Binärdateien
  • Standardmäßige Dateiverarbeitung genau einmal (idempotent)
  • Zieltabellenschemarückschluss, Zuordnung, Zusammenführung und Entwicklung

Hinweis

Für eine skalierbarere und robustere Dateierfassung empfiehlt Databricks SQL-Benutzer*innen die Verwendung von Streamingtabellen. Weitere Informationen finden Sie unter Laden von Daten mithilfe von Streamingtabellen in Databricks SQL.

Warnung

COPY INTO berücksichtigt die Arbeitsbereichseinstellung für Löschvektoren. Wenn diese Option aktiviert ist, werden Löschvektoren in der Zieltabelle aktiviert, wenn COPY INTO in einem SQL-Warehouse oder einer Compute-Umgebung mit Databricks Runtime 14.0 oder höher ausgeführt wird. Nach der Aktivierung blockieren Löschvektoren Abfragen für eine Tabelle in Databricks Runtime 11.3 LTS und niedriger. Weitere Informationen finden Sie unter Was sind Löschvektoren? und Automatische Aktivierung von Löschvektoren.

Anforderungen

Ein Kontoadministrator muss die Schritte unter Konfigurieren des Datenzugriffs für die Erfassung ausführen, um den Zugriff auf Daten im Cloudobjektspeicher zu konfigurieren, bevor Benutzer Daten mit COPY INTO laden können.

Beispiel: Laden von Daten in eine schemalose Delta Lake-Tabelle

Hinweis

Dieses Feature ist in Databricks Runtime 11.3 LTS und höher verfügbar.

Sie können leere Delta-Platzhaltertabellen erstellen, damit das Schema später während eines COPY INTO-Befehls abgeleitet wird, indem Sie mergeSchema in true auf COPY_OPTIONS festlegen:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Die oben genannte SQL-Anweisung ist idempotent und kann so geplant werden, dass Daten genau einmal in eine Delta-Tabelle aufgenommen werden.

Hinweis

Die leere Delta-Tabelle kann außerhalb von COPY INTO nicht verwendet werden. INSERT INTO und MERGE INTO werden nicht unterstützt, Daten in schemalose Delta-Tabellen zu schreiben. Nachdem Daten mit COPY INTO in die Tabelle eingefügt wurden, wird die Tabelle abfragbar.

Weitere Informationen finden Sie unter Erstellen von Zieltabellen für COPY INTO.

Beispiel: Festlegen des Schemas und Laden von Daten in eine Delta Lake-Tabelle

Das folgende Beispiel zeigt, wie Sie eine Delta-Tabelle erstellen und dann den SQL-Befehl COPY INTO verwenden, um Beispieldaten aus Databricks-Datasets in die Tabelle zu laden. Sie können den Python-, R-, Scala- oder SQL-Beispielcode in einem Notebook ausführen, das an einen Azure Databricks-Cluster angefügt ist. Sie können den SQL-Code auch in einer Abfrage ausführen, die einem SQL-Warehouse in Databricks SQL zugeordnet ist.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Führen Sie zum Bereinigen den folgenden Code aus, der die Tabelle löscht:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Bereinigen von Metadatendateien

Sie können VACUUM ausführen, um nicht referenzierte Metadatendateien zu bereinigen, die von COPY INTO in Databricks Runtime 15.2 und höher erstellt wurden.

Verweis

  • Databricks Runtime 7.x und höher: COPY INTO

Zusätzliche Ressourcen