チュートリアル: Spark SQL での COPY INTO

Databricks では、数千のファイルを含むデータ ソースの増分データ読み込みと一括データ読み込みに COPY INTO コマンドを使用することをお勧めします。 Databricks では、高度なユース ケースに自動ローダーを使用することをお勧めします。

このチュートリアルでは、COPY INTO コマンドを使用して、クラウド オブジェクト ストレージから Azure Databricks ワークスペースのテーブルにデータをロードします。

必要条件

  1. Azure サブスクリプション、そのサブスクリプション内の Azure Databricks ワークスペース、およびそのワークスペース内のクラスター。 これらを作成するには、「クイック スタート: Azure portal を使用して Azure Databricks ワークスペースで Spark ジョブを実行する」を参照してください。 このクイック スタートに従う場合は、「Spark SQL ジョブを実行する」のセクションの手順に従う必要はありません。
  2. Databricks Runtime 11.3 LTS 以降が実行されているワークスペース内の汎用クラスター。 汎用クラスターを作成するには、「コンピューティング構成リファレンス」を参照してください。
  3. Azure Databricks ワークスペースのユーザー インターフェイスに関する理解。 ワークスペースの移動に関する記事を参照してください。
  4. Databricks ノートブックの操作に関する知識。
  5. データを書き込むことができる場所。このデモでは DBFS ルートを例として使用しますが、Databricks では、Unity Catalog で構成された外部ストレージの場所をお勧めしています。

手順 1. 環境を構成し、データ ジェネレーターを作成する

このチュートリアルは、Azure Databricks と既定のワークスペース構成に関する基本的な知識を前提としています。 指定されたコードを実行できない場合は、ワークスペース管理者に問い合わせて、コンピューティング リソースとデータを書き込むことができる場所にアクセスできることを確認してください。

指定されたコードは、source パラメーターを使用して COPY INTO データ ソースとして構成する場所を指定します。 記述されているように、このコードは DBFS ルート上の場所を指します。 外部オブジェクト ストレージの場所に対する書き込みアクセス許可がある場合は、ソース文字列の dbfs:/ の部分をオブジェクト ストレージへのパスに置き換えます。 このコード ブロックではこのデモをリセットするための再帰的な削除も行われるため、運用環境のデータではこれを指さないようにし、既存のデータを上書きまたは削除しないように /user/{username}/copy-into-demo の入れ子になったディレクトリを保持してください。

  1. 新しい SQL ノートブックを作成し、Databricks Runtime 11.3 LTS 以降を実行しているクラスターにアタッチします。

  2. このチュートリアルで使用するストレージの場所とデータベースをリセットするには、次のコードをコピーして実行します。

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 次のコードをコピーして実行し、データをランダムに生成するために使用されるテーブルと関数をいくつか構成します。

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

手順 2: クラウド ストレージにサンプル データを書き込む

Azure Databricks では、Delta Lake 以外のデータ形式への書き込みはまれです。 ここで示すコードは JSON に書き込み、別のシステムからオブジェクト ストレージに結果をダンプする可能性のある外部システムをシミュレートします。

  1. 次のコードをコピーして実行し、生の JSON データのバッチを書き込みます。

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

手順 3: COPY INTO を使用して JSON データをべき等に読み込む

COPY INTO を使用する前に、ターゲットの Delta Lake テーブルを作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、CREATE TABLE ステートメントにテーブル名以外の情報を指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。

  1. 次のコードをコピーして実行して、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

このアクションはべき等であるため、複数回実行できますが、データは 1 回だけ読み込まれます。

手順 4: テーブルの内容をプレビューする

単純な SQL クエリを実行して、このテーブルの内容を手動で確認できます。

  1. 次のコードをコピーして実行し、テーブルをプレビューします。

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

手順 5: より多くのデータを読み込み、結果をプレビューする

手順 2 ~ 4 を何度も実行して、ランダムな生 JSON データの新しいバッチをソースに配置し、そのバッチを COPY INTO で Delta Lake にべき等に読み込み、結果をプレビューできます。 新しいデータが到着せずに COPY INTO が複数回書き込まれたり実行されたりする生データの複数のバッチをシミュレートするには、次の手順を順番に実行するか、複数回実行してみてください。

手順 6: チュートリアルをクリーンアップする

このチュートリアルを完了すると、保持する必要がなくなった関連付けられているリソースをクリーンアップできます。

  1. 次のコードをコピーして実行し、データベース、テーブルを削除し、すべてのデータを削除します。

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. コンピューティング リソースを停止するには、[クラスター] タブに移動し、クラスターを [終了] します。

その他の技術情報