Prise en main de COPY INTO pour charger des données.

La commande SQL COPY INTO vous permet de charger des données d’un emplacement de fichier vers une table Delta. Il s’agit d’une opération répétable et idempotente : les fichiers de l’emplacement source qui ont déjà été chargés sont ignorés.

COPY INTO offre les capacités suivantes :

  • Filtres de fichiers ou de répertoires facilement configurables à partir du stockage en ligne, notamment les volumes S3, ADLS Gen2, ABFS, GCS et Unity Catalog.
  • Prise en charge de plusieurs formats de fichiers sources : CSV, JSON, XML, Avro, ORC, Parquet, texte et fichiers binaires
  • Traitement de fichiers une seule fois (idempotent) par défaut
  • Inférence, mappage, fusion et évolution de schéma de table cible

Remarque

Pour bénéficier d’une expérience d’ingestion de fichiers plus évolutive et plus robuste, Databricks recommande aux utilisateurs SQL de tirer parti des tables de diffusion en continu. Consultez Charger des données à l’aide de tables de streaming dans Databricks SQL.

Avertissement

COPY INTO respecte le paramètre d’espace de travail pour les vecteurs de suppression. Si cette option est activée, les vecteurs de suppression sont activés sur la table cible lorsque COPY INTO s’exécute sur un entrepôt SQL ou que du calcul s’exécute sur Databricks Runtime 14.0 ou une version ultérieure. Une fois activés, les vecteurs de suppression bloquent les requêtes sur une table dans Databricks Runtime 11.3 LTS et les versions inférieures. Voir Quels sont les vecteurs de suppression ? et Activer automatiquement les vecteurs de suppression.

Spécifications

Un administrateur de compte doit suivre les étapes décrites dans Configurer l’accès aux données pour l’ingestion pour configurer l’accès aux données dans le stockage d’objets cloud avant que les utilisateurs puissent charger des données à l’aide de COPY INTO.

Exemple : charger des données dans une table Delta Lake sans schéma

Remarque

Cette fonctionnalité est disponible sur Databricks Runtime 11.3 LTS et versions ultérieures.

Vous pouvez créer des tables Delta d’espace réservé vides afin que le schéma soit déduit ultérieurement lors d’une commande COPY INTO en configurant mergeSchema vers true dans COPY_OPTIONS :

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

L’instruction SQL ci-dessus est idempotente et peut être planifiée pour s’exécuter de manière à ingérer des données exactement une fois dans une table Delta.

Notes

La table Delta vide n’est pas utilisable en dehors de COPY INTO. INSERT INTO et MERGE INTO ne sont pas pris en charge pour écrire des données dans des tables Delta sans schéma. Une fois les données insérées dans la table avec COPY INTO, la table devient interrogeable.

Voir la section Créer des tables cibles pour COPY INTO.

Exemple : définir le schéma et charger des données dans une table Delta Lake

L’exemple suivant montre comment créer une table Delta, puis utiliser la commande SQL COPY INTO pour charger dans la table des exemples de données provenant de jeux de données Databricks. Vous pouvez exécuter l’exemple de code Python, R, Scala ou SQL à partir d’un notebook attaché à un cluster Azure Databricks. Vous pouvez aussi exécuter le code SQL à partir d’une requête associée à un entrepôt SQL dans Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Pour nettoyer, exécutez le code suivant, qui supprime la table :

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Nettoyer les fichiers de métadonnées

Vous pouvez exécuter VACUUM pour nettoyer les fichiers de métadonnées non référencés créés par COPY INTO dans Databricks Runtime 15.2 et ultérieur.

Informations de référence

  • Databricks Runtime 7.x et versions ultérieures : COPY INTO

Ressources supplémentaires