COPY INTO を使用してデータを読み込む

COPY INTO SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。 これは再試行可能なべき等操作であり、ソースの場所にある、既に読み込み済みのファイルはスキップされます。

COPY INTO には次の機能があります。

  • S3、ADLS Gen2、ABFS、GCS、Unity Catalog ボリュームなど、クラウド ストレージから簡単に構成できるファイルまたはディレクトリ フィルター。
  • CSV、JSON、XML、AvroORCParquet、テキスト、バイナリ ファイルなど、複数のソース ファイル形式のサポート
  • 既定で 1 回だけ (べき等) のファイル処理
  • ターゲット テーブル スキーマの推論、マッピング、マージ、および展開

Note

よりスケーラブルで堅牢なファイル インジェスト エクスペリエンスを実現するために、Databricks においては SQL ユーザーによるストリーミング テーブルの活用をお勧めします。 「Databricks SQL でストリーミング テーブルを使用してデータを読み込む」を参照してください。

警告

COPY INTO は、削除ベクトルのワークスペース設定を考慮します。 有効にした場合、Databricks Runtime 14.0 以降を実行しているコンピューティングまたは SQL ウェアハウスで COPY INTO が実行されている場合、ターゲット テーブルで削除ベクトルが有効になります。 有効にすると、削除ベクトルは Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリをブロックします。 「削除ベクトルとは」と「削除ベクトルの自動有効化」を参照してください。

要件

アカウント管理者は、ユーザーが COPY INTO を使用してデータを読み込む前に、「インジェスト用データ アクセスを構成する」の手順に従って、クラウド オブジェクト ストレージ内のデータへのアクセスを構成する必要があります。

例: スキーマレス Delta Lake テーブルにデータを読み込む

Note

この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。

COPY_OPTIONSmergeSchematrue に設定することで、後で COPY INTO コマンド中にスキーマが推論されるように、空のプレースホルダー Delta テーブルを作成できます。

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

上記の SQL ステートメントはべき等であり、Delta テーブルに 1 回だけデータを取り込むための実行をスケジュールできます。

注意

空の Delta テーブルは、COPY INTO の外部では使用できません。 INSERT INTO および MERGE INTO では、スキーマレス Delta テーブルへのデータの書き込みがサポートされていません。 データが COPY INTO でテーブルに挿入されると、テーブルはクエリを実行することが可能になります。

COPY INTO のターゲット テーブルの作成」を参照してください。

例: スキーマを設定し、Delta Lake テーブルにデータを読み込む

次の例では、Delta テーブルを作成し、COPY INTO SQL コマンドを使用して Databricks データセットのサンプル データをテーブルに読み込む方法を示しています。 Azure Databricks クラスターにアタッチされているノートブックから、Python、R、Scala、または SQL のサンプル コードを実行できます。 また、Databricks SQLSQL ウェアハウスに関連付けられているクエリから SQL コードを実行することもできます。

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

クリーンアップするには、次のコードを実行します。これで、テーブルが削除されます。

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

メタデータ ファイルをクリーンアップする

Databricks Runtime 15.2 以降で VACUUM を実行すると、COPY INTO によって作成された参照されていないメタデータ ファイルをクリーンアップできます。

リファレンス

その他の技術情報