Azure Synapse Analytics 向けの Spark Common Data Model コネクタ

Spark Common Data Model コネクタ (Spark CDM コネクタ) は、Azure Synapse Analytics のフォーマット リーダー/ライターです。 これにより、Spark プログラムで、Spark データフレームを介して Common Data Model フォルダー内の Common Data Model エンティティの読み取りと書き込みを行うことができます。

Common Data Model 1.2 を使用して Common Data Model ドキュメントを定義する方法については、Common Data Model の概要とその使用方法に関するこちらの記事を参照してください。

機能

高レベルでは、コネクタでは次のことがサポートされています。

  • 3.1、3.2、3.3.
  • Common Data Model フォルダー内のエンティティから Spark データフレームへのデータの読み取り。
  • Common Data Model エンティティの定義に基づく、Spark データフレームから Common Data Model フォルダー内のエンティティへの書き込み。
  • データフレーム スキーマに基づく、Spark データフレームから Common Data Model フォルダー内のエンティティへの書き込み。

コネクタでは、次のこともサポートされています。

  • 階層型名前空間 (HNS) が有効になっている Azure Data Lake Storage の Common Data Model フォルダーの読み取りと書き込み。
  • マニフェストまたは model.json ファイルによって記述されている Common Data Model フォルダーからの読み取り。
  • マニフェスト ファイルによって記述されている Common Data Model フォルダーへの書き込み。
  • 列ヘッダーの有無にかかわらず、ユーザーが選択可能な区切り記号文字を使用する、CSV 形式のデータ。
  • 入れ子になった Parquet など、Apache Parquet 形式のデータ。
  • 読み取りでのサブマニフェストと、書き込みでのエンティティ スコープのサブマニフェストのオプションでの使用。
  • ユーザーが変更可能なパーティション パターンを介したデータの書き込み。
  • Azure Synapse Analytics でのマネージド ID と資格情報の使用。
  • config.json ファイルに記述されている Common Data Model アダプター定義を介したインポートで使用される Common Data Model エイリアスの場所の解決。

制限事項

コネクタでは、次の機能とシナリオはサポートされていません。

  • 並列書き込み。 これはお勧めできません。 ストレージ レイヤーにはロック メカニズムがありません。
  • エンティティを読み取った後の、エンティティ メタデータへのプログラムによるアクセス。
  • エンティティを書き込むときの、メタデータを設定またはオーバーライドするためのプログラムによるアクセス。
  • スキーマ ドリフト (書き込まれるデータフレームのデータに、エンティティ定義に含まれない追加の属性が含まれている場合)。
  • スキーマの進化 (エンティティのパーティションで異なるバージョンのエンティティ定義が参照されている場合)。 バージョンは com.microsoft.cdm.BuildInfo.version を実行することで確認できます。
  • model.json の書き込みサポート。
  • Parquet への Time データの書き込み。 現時点では、コネクタでは、CSV ファイルのみのための DateTime 値ではなく、Common Data Model の Time 値として解釈されるタイム スタンプ列のオーバーライドがサポートされています。
  • Parquet の Map 型、プリミティブ型の配列、および配列型の配列。 これらは Common Data Model では現在サポートされていないため、Spark CDM コネクタでもサポートされていません。

サンプル

コネクタの使用を開始するには、サンプル コードと Common Data Model ファイルを確認してください。

データの読み取り

コネクタでデータを読み取るとき、コネクタでは、マニフェストで参照されているように、指定されたエンティティに対して解決されたエンティティ定義に基づいて、Common Data Model フォルダーのメタデータを使ってデータフレームが作成されます。 コネクタでは、エンティティ属性名がデータフレーム列名として使用されます。 属性データ型が列データ型にマップされます。 データフレームは、読み込まれるときに、マニフェストで識別されたエンティティ パーティションから設定されます。

コネクタは、指定されたマニフェストとすべての第 1 レベルのサブマニフェストで、指定されたエンティティを検索します。 必要なエンティティが第 2 レベル以下のサブマニフェストにある場合、または異なるサブマニフェストに同じ名前の複数のエンティティがある場合、ユーザーはルート マニフェストではなく、必要なエンティティが含まれるサブマニフェストを指定する必要があります。

エンティティ パーティションは、形式 (CSV や Parquet など) を混在させることができます。 マニフェストで識別されたすべてのエンティティ データ ファイルが、形式に関係なく 1 つのデータセットに結合されて、データフレームに読み込まれます。

コネクタでは、CSV データを読み取るとき、Spark の failfast オプションが既定で使用されます。 列の数がエンティティ内の属性の数と等しくない場合は、コネクタでエラーが返されます。

あるいは、0.19 以降では、コネクタで許容モードがサポートされます (CSV ファイルの場合のみ)。 許容モードでは、CSV の行の列数がエンティティ スキーマより少ないとき、コネクタでは不足している列に null 値が割り当てられます。 CSV の行にエンティティ スキーマより多くの列があるときは、エンティティ スキーマの列数を超える列は、スキーマの列数まで切り捨てられます。 使用方法は次のとおりです。

.option("mode", "permissive") or .option("mode", "failfast")

データの書き込み

コネクタによって Common Data Model フォルダーに書き込まれるとき、エンティティがまだそのフォルダーに存在しない場合、コネクタによって新しいエンティティと定義が作成されます。 エンティティと定義が Common Data Model フォルダーに追加され、マニフェスト内で参照されます。

コネクタでは、次の 2 つの書き込みモードがサポートされています。

  • 明示的な書き込み: 物理エンティティの定義は、ユーザーが指定する論理 Common Data Model エンティティの定義に基づきます。

    コネクタによって、指定した論理エンティティの定義が読み取られて解決され、Common Data Model フォルダーで使われる物理エンティティの定義が作成されます。 直接的または間接的に参照されている Common Data Model 定義ファイルのインポート ステートメントに別名が含まれている場合は、これらの別名を Common Data Model アダプターおよびストレージの場所にマップする config.json ファイルを指定する必要があります。

    • データフレーム スキーマが参照されているエンティティ定義と一致しない場合、コネクタによってエラーが返されます。 データフレームの列のデータ型が、エンティティの属性のデータ型と一致していることを確認してください。これには、Common Data Model の特性による 10 進数データ、有効桁数、小数点以下桁数が含まれます。
    • データフレームがエンティティ定義と矛盾している場合は、コネクタによってエラーが返されます。
    • データフレームが矛盾していない場合は、次のようになります。
      • エンティティがマニフェストに既に存在する場合は、コネクタによって、指定されたエンティティ定義が解決され、Common Data Model フォルダー内の定義に対して検証されます。 定義が一致しない場合、コネクタによってエラーが返されます。 それ以外の場合、コネクタによってデータが書き込まれ、マニフェスト内のパーティション情報が更新されます。
      • エンティティが Common Data Model フォルダー内に存在しない場合、コネクタによって、エンティティ定義の解決されたコピーが Common Data Model フォルダー内のマニフェストに書き込まれます。 コネクタによってデータが書き込まれ、マニフェスト内のパーティション情報が更新されます。
  • 暗黙的な書き込み: エンティティ定義は、データフレームの構造から導出されます。

    • エンティティが Common Data Model フォルダーに存在しない場合、コネクタでは暗黙的な定義を使って、解決されたエンティティ定義がターゲットの Common Data Model フォルダーに作成されます。

    • Common Data Model フォルダーにエンティティが存在する場合、コネクタでは既存のエンティティ定義に対して暗黙的な定義が検証されます。 定義が一致しない場合、コネクタによってエラーが返されます。 それ以外の場合、コネクタによってデータが書き込まれ、導出された論理エンティティ定義がエンティティ フォルダーのサブフォルダーに書き込まれます。

      コネクタによって、エンティティ サブフォルダー内のデータ フォルダーにデータが書き込まれます。 保存モードにより、新しいデータが上書きされるか、既存のデータに追加されるか、またはデータが存在する場合はエラーが返されかが決まります。 既定では、データが既に存在する場合はエラーが返されます。

Common Data Model の別名の統合

Common Data Model 定義ファイルでは、インポート ステートメントで別名を使ってインポート ステートメントが簡略化され、インポートされた内容の場所を実行時に遅延バインドできます。 別名の使用:

  • Common Data Model ファイルを簡単に整理して、関連する Common Data Model 定義を異なる場所にグループ化できるようにします。
  • 実行時に、さまざまなデプロイ場所から Common Data Model の内容にアクセスできます。

次のスニペットは、Common Data Model 定義ファイルのインポート ステートメントでの別名の使用を示したものです。

"imports": [  
{     
  "corpusPath": "cdm:/foundations.cdm.json"
},  
{       
  "corpusPath": "core:/TrackedEntity.cdm.json"  
},  
{      
  "corpusPath": "Customer.cdm.json"  
} 
]

前の例では、Common Data Model 基礎ファイルの場所の別名として cdm が使われています。 TrackedEntity 定義ファイルの場所の別名として core が使用されています。

別名は、Common Data Model の config.json ファイル内のアダプター エントリの名前空間の値に一致するテキスト ラベルです。 アダプター エントリでは、アダプターの種類 (たとえば、adlsCDNGitHublocal など) と場所を定義する URL が指定されます。 接続タイムアウトなどの他の構成オプションがサポートされているアダプターもあります。 別名は任意のテキスト ラベルですが、cdm という別名は特別な方法で処理されます。

Spark CDM コネクタは、エンティティ定義のモデルのルートの場所で、読み込む config.json ファイルを検索します。 config.json ファイルが別の場所にある場合、またはユーザーがモデルのルートにある config.json ファイルをオーバーライドしたい場合、ユーザーは configPath オプションを使って config.json ファイルの場所を指定できます。 config.json ファイルには、解決される Common Data Model コードで使われているすべての別名のアダプター エントリが含まれている必要があります。そうでない場合、コネクタでエラーが報告されます。

config.json ファイルをオーバーライドできることは、実行時にアクセスできる場所を Common Data Model 定義に対して指定できることを意味します。 実行時に参照される内容が、Common Data Model が最初に作成されるときに使われた定義と一致するようにしてください。

慣例により、cdm という別名は、foundations.cdm.json ファイルを含むルート レベルの標準の Common Data Model 定義の場所を参照します。 このファイルには、Common Data Model プリミティブ データ型と、ほとんどの Common Data Model エンティティ定義に必要な特性定義のコア セットが含まれます。

cdm という別名は、config.json ファイルのアダプター エントリを使って、他の別名と同様に解決できます。 アダプターを指定しない場合、または null エントリを指定した場合、cdm という別名は既定で https://cdm-schema.microsoft.com/logical/ の Common Data Model パブリック コンテンツ配信ネットワーク (CDN) に解決されます。

また、cdmSource オプションを使用して、cdm 別名の解決方法をオーバーライドすることもできます。 cdmSource オプションを使うと、config.json ファイルを作成または参照しなくて済むため、別名 cdm が Common Data Model 定義で使われる唯一の別名である場合に便利です。

パラメーター、オプション、保存モード

読み取りと書き込みの両方に対して、Spark CDM コネクタのライブラリ名をパラメーターとして指定します。 オプションのセットを使って、コネクタの動作をパラメーター化します。 書き込み中は、コネクタで保存モードもサポートされます。

コネクタ ライブラリの名前、オプション、保存モードは、次のような形式になっています。

  • dataframe.read.format("com.microsoft.cdm") [.option("option", "value")]*
  • dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.\<saveMode\>)

読み取りにコネクタを使用する際のオプションの一部を示す例を次に示します。

val readDf = spark.read.format("com.microsoft.cdm")
  .option("storage", "mystorageaccount.dfs.core.windows.net")
  .option("manifestPath", "customerleads/default.manifest.cdm.json")
  .option("entity", "Customer")
  .load()

読み取りと書き込みに共通のオプション

次のオプションは、読み取りまたは書き込み対象の Common Data Model フォルダー内のエンティティを指定します。

オプション 説明 パターンと使用例
storage Common Data Model フォルダーを含む、HNS が有効になっている Azure Data Lake Storage アカウントのエンドポイント URL。
URL は dfs.core.windows.net を使用します。
<accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net"
manifestPath ストレージ アカウント内のマニフェストまたは model.json ファイルへの相対パス。 読み取りの場合は、ルート マニフェスト、サブマニフェスト、または model.json ファイルを指定できます。 書き込みの場合は、ルート マニフェストを指定する必要があります。 <container>/{<folderPath>}<manifestFileName>,
"mycontainer/default.manifest.cdm.json" "models/hr/employees.manifest.cdm.json"
"models/hr/employees/model.json" (読み取り専用)
entity マニフェスト内のソースまたはターゲット エンティティの名前。 フォルダーにエンティティを初めて書き込むときは、コネクタによって、解決されたエンティティ定義にこの名前が設定されます。 エンティティ名は大文字と小文字が区別されます。 <entityName>
"customer"
maxCDMThreads コネクタによってエンティティ定義を解決する間の同時読み取りの最大数。 任意の有効な整数 (5 など)

注意

読み取り時に Common Data Model フォルダーの物理エンティティ定義に加えて、論理エンティティ定義を指定する必要はなくなりました。

明示的な書き込みオプション

次のオプションは、書き込まれるエンティティの論理エンティティ定義を示します。 論理エンティティ定義は、エンティティの書き込み方法を定義する物理定義に解決されます。

オプション 説明 パターンまたは使用例
entityDefinitionStorage エンティティ定義を含む Azure Data Lake Storage アカウント。 Common Data Model フォルダーをホストするストレージ アカウントと異なる場合は必須。 <accountName>.dfs.core.windows.net
"myAccount.dfs.core.windows.net"
entityDefinitionModelRoot アカウント内のモデル ルートまたはコーパスの場所。 <container>/<folderPath>
"crm/core"
entityDefinitionPath エンティティの場所。 モデル ルートを基準とする Common Data Model 定義ファイルへのファイル パス。そのファイル内のエンティティの名前を含みます。 <folderPath>/<entityName>.cdm.json/<entityName>
"sales/customer.cdm.json/customer"
configPath エンティティ定義ファイルと直接的または間接的に参照される Common Data Model ファイルに含まれるすべての別名のアダプター構成を含む config.json ファイルへのコンテナーとフォルダー パス。

config.json がモデル ルート フォルダーにある場合、このオプションは必要ありません。
<container><folderPath>
useCdmStandardModelRoot モデルのルートが https://cdm-schema.microsoft.com/CDM/logical/ にあることを示します。 Common Data Model CDN で定義されているエンティティ型を参照するために使用されます。 entityDefinitionStorageentityDefinitionModelRoot (指定されている場合) をオーバーライドします。
"useCdmStandardModelRoot"
cdmSource 別名 cdm の解決方法を定義します (Common Data Model 定義ファイルに存在する場合)。 このオプションを使うと、config.json ファイルで指定されているすべての cdm アダプターがオーバーライドされます。 値は builtin または referenced です。 既定値は referenced です。

このオプションを referenced に設定すると、コネクタでは、https://cdm-schema.microsoft.com/logical/ で公開された最新の標準 Common Data Model 定義が使用されます。 このオプションを builtin に設定すると、コネクタでは、コネクタで使用されている Common Data Model オブジェクト モデルに組み込まれている Common Data Model 基本定義が使用されます。

注:
* Spark CDM コネクタでは最新の Common Data Model SDK が使用されていない可能性があるため、公開されている最新の標準定義が含まれていない可能性があります。
* 組み込みの定義には、foundations.cdm.jsonprimitives.cdm.json などの最上位の Common Data Model コンテンツのみが含まれます。 下位レベルの標準の Common Data Model を使用する場合は、referenced を使用するか、config.jsoncdm アダプターを含めます。
"builtin"|"referenced"

上の例で、顧客エンティティ定義オブジェクトへの完全なパスは https://myAccount.dfs.core.windows.net/models/crm/core/sales/customer.cdm.json/customer です。 そのパスで、models は Azure Data Lake Storage 内のコンテナーです。

暗黙的な書き込みオプション

書き込み時に論理エンティティ定義を指定しないと、エンティティはデータフレームのスキーマに基づいて暗黙的に書き込まれます。

暗黙的に書き込む場合、タイム スタンプ列は通常、Common Data Model の DateTime データ型として解釈されます。 この解釈をオーバーライドして、データ型を指定する列に関連付けられているメタデータ オブジェクトを指定することで、Common Data Model の Time データ型の属性を作成できます。 詳細については、この記事で後述する「Common Data Model の時刻データの処理」を参照してください。

時刻データの書き込みは、CSV ファイルに対してのみサポートされています。 そのサポートは、現在 Parquet に拡張されていません。

フォルダー構造とデータ形式のオプション

次のオプションを使用して、フォルダーの編成とファイル形式を変更できます。

オプション 説明 パターンまたは使用例
useSubManifest true の場合は、ターゲット エンティティはサブマニフェストを通じてルート マニフェストに組み込まれます。 サブマニフェストとエンティティ定義は、ルートの下のエンティティ フォルダーに書き込まれます。 既定値は false です。 "true"|"false"
format ファイルの形式を定義します。 現在サポートされているファイルの形式は、CSV と Parquet です。 既定値は csv です。 "csv"|"parquet"
delimiter CSV のみ。 使用する区切り記号を定義します。 既定値はコンマです。 "|"
columnHeaders CSV のみ。 true の場合、列ヘッダーを含む最初の行がデータ ファイルに追加されます。 既定値は true です。 "true"|"false"
compression 書き込みのみ。 Parquet のみ。 使用する圧縮形式を定義します。 既定値は snappy です。 "uncompressed" | "snappy" | "gzip" | "lzo"
dataFolderFormat エンティティ フォルダー内のユーザー定義可能なデータ フォルダー構造を許可します。 DateTimeFormatter の書式設定を使用して、日付と時刻の値をフォルダー名に置き換えます。 フォーマッタではない内容は、単一引用符で囲む必要があります。 既定の形式は "yyyy"-"MM"-"dd" です。これにより、2020-07-30 のようなフォルダー名が生成されます。 year "yyyy" / month "MM"
"Data"

保存モード

保存モードでは、データフレームを書き込むときに、Common Data Model フォルダー内の既存のエンティティ データをコネクタで処理する方法を指定します。 データが既に存在する場合のオプションは、上書き、追加、またはエラーを返すことです。 既定の保存モードは ErrorIfExists です。

モード 説明
SaveMode.Overwrite 既存のエンティティ定義が変更されている場合は上書きし、既存のデータ パーティションを書き込まれているデータ パーティションに置き換えます。
SaveMode.Append 既存のパーティションとは別の新しいパーティションに、書き込まれたデータを追加します。

このモードでは、スキーマの変更はサポートされていません。 書き込まれるデータのスキーマが、既存のエンティティ定義と互換性がないものである場合、コネクタによってエラーがスローされます。
SaveMode.ErrorIfExists パーティションが既に存在する場合は、エラーを返します。

書き込み時にデータ ファイルの名前を付けて編成する方法の詳細については、この記事で後述する「フォルダーとファイルの名前付けと編成」セクションを参照してください。

認証

Common Data Model のメタデータとデータ パーティションの読み取りと書き込みに関して Spark CDM コネクタで使用できる認証モードは、資格情報パススルー、アクセス共有シグネチャ (SAS) トークン、アプリ登録の 3 つです。

資格情報のパススルー

Azure Synapse Analytics の Spark CDM コネクタでは、Common Data Model フォルダーを含む Azure Data Lake Storage アカウントへのアクセスを仲介するため、Azure リソース用マネージド ID の使用がサポートされています。 マネージド ID は、Azure Synapse Analytics ワークスペースごとに自動的に作成されます。 コネクタは、コネクタが呼び出されたノートブックを含むワークスペースのマネージド ID を使って、ストレージ アカウントへの認証を行います。

選択された ID に適切なストレージ アカウントへのアクセス権があることを確認する必要があります。

  • ライブラリで Common Data Model フォルダーに書き込めるように、ストレージ BLOB データ共同作成者のアクセス許可を付与します。
  • 読み取りアクセスのみを許可するように、ストレージ BLOB データ閲覧者のアクセス許可を付与します。

どちらの場合も、それ以外のコネクタ オプションは必要ありません。

SAS トークンベースのアクセス制御のオプション

SAS トークン資格情報は、ストレージ アカウントへの認証のための追加オプションです。 SAS トークン認証では、SAS トークンをコンテナーまたはフォルダーのレベルで使用できます。 適切なアクセス許可が必要です。

  • マニフェストまたはパーティションの読み取りアクセス許可には、読み取りレベルのサポートのみが必要です。
  • 書き込みアクセス許可には、読み取りと書き込みの両方のサポートが必要です。
オプション 説明 パターンと使用例
sasToken 適切なアクセス許可で相対ストレージ アカウントにアクセスするための SAS トークン <token>

資格情報ベースのアクセス制御のオプション

マネージド ID またはユーザー ID を使う代わりに、明示的な資格情報を提供して、Spark CDM コネクタがデータにアクセスできるようにすることができます。 Microsoft Entra ID でアプリ登録を作成します。 次に、次のいずれかのロールを使用して、このアプリの登録アクセス権をストレージ アカウントに付与します。

  • ライブラリで Common Data Model フォルダーに書き込めるようにする、ストレージ BLOB データ共同作成者
  • 読み取りアクセス許可のみを許可するストレージ BLOB データ閲覧者

アクセス許可が作成されたら、次のオプションを使って、コネクタの呼び出しごとにアプリ ID、アプリケーション キー、テナント ID をそれに渡すことができます。 これらの値がノートブック ファイルにクリア テキストで格納されないように、Azure Key Vault を使って格納することをお勧めします。

オプション 説明 パターンと使用例
appId ストレージ アカウントへの認証用のアプリ登録 ID <guid>
appKey 登録されているアプリ キーまたはシークレット <encrypted secret>
tenantId アプリが登録されている Microsoft Entra のテナント ID <guid>

次のすべての例では、appIdappKey、および tenantId の各変数が使用されています。 これらの変数は、Azure のアプリ登録 (ストレージの書き込みのためのストレージ BLOB データ共同作成者アクセス許可と、読み取りのためのストレージ BLOB データ閲覧者アクセス許可) に基づいて、それより前のコードで初期化されています。

Read

このコードは、mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json のマニフェストを使って、Common Data Model フォルダーから Person エンティティを読み取ります。

val df = spark.read.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .load()

データフレーム スキーマのみを使用した暗黙の書き込み

このコードでは、mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json のマニフェストと Event エンティティを使って Common Data Model フォルダーにデータフレーム df を書き込みます。

このコードでは、イベント データを Parquet ファイルとして書き込み、gzip で圧縮し、フォルダーに追加します。 (コードでは、既存のファイルを削除せずに新しいファイルを追加します)。


df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
 .option("entity", "Event")
 .option("format", "parquet")
 .option("compression", "gzip")
 .mode(SaveMode.Append)
 .save()

Data Lake Storage に格納されているエンティティ定義を使用した明示的な書き込み

このコードでは、https://_mystorage_.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json のマニフェストと Person エンティティを使って Common Data Model フォルダーにデータフレーム df を書き込みます。 このコードでは、Person データが新しい CSV ファイルとして書き込まれ (既定)、フォルダー内の既存のファイルを上書きします。

このコードでは、https://_mystorage_.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json から Person エンティティ定義を取得します。

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .option("entityDefinitionModelRoot", "cdmmodels/core")
 .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
 .mode(SaveMode.Overwrite)
 .save()

Common Data Model GitHub リポジトリで定義されているエンティティを使用した明示的な書き込み

このコードでは、次のものを使って Common Data Model フォルダーにデータフレーム df を書き込みます。

  • https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json のマニフェスト。
  • TeamMembership サブディレクトリに作成された TeamMembership エンティティを含むサブマニフェスト。

TeamMembership データは CSV ファイルに書き込まれ (既定)、既存のデータ ファイルを上書きします。 このコードでは、applicationCommon 内のチーム メンバーシップの Common Data Model CDN から TeamMembership エンティティ定義が取得されます。

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
 .option("entity", "TeamMembership")
 .option("useCdmStandardModelRoot", true)
 .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/TeamMembership")
 .option("useSubManifest", true)
 .mode(SaveMode.Overwrite)
 .save()

その他の考慮事項

Spark から Common Data Model へのデータ型のマッピング

コネクタでは、Common Data Model を Spark と変換するときに、次のデータ型マッピングを適用します。

Spark Common Data Model
ShortType SmallInteger
IntegerType Integer
LongType BigInteger
DateType Date
Timestamp DateTime (オプションで Time)
StringType String
DoubleType Double
DecimalType(x,y) 18,4 (既定の有効桁数と小数点以下桁数は Decimal (x,y))
FloatType Float
BooleanType Boolean
ByteType Byte

コネクタでは、Common Data Model の Binary データ型はサポートされていません。

Common Data Model の Date、DateTime、および DateTimeOffset データの処理

Spark CDM コネクタでは、Spark と Parquet については Common Data Model の DateDateTime のデータ型が通常どおりに処理されます。 CSV の場合、コネクタではこれらのデータ型が ISO 8601 形式で読み書きされます。

コネクタでは、Common Data Model の DateTime データ型の値が UTC として解釈されます。 CSV の場合、コネクタではこれらの値が ISO 8601 形式で書き込まれます。 たとえば 2020-03-13 09:49:00Z です。

ローカル時刻の記録を目的とした Common Data Model の DateTimeOffset の値は、Spark と Parquet では CSV と異なったやり方で処理されます。 CSV やその他の形式では、2020-03-13 09:49:00-08:00 などの datetime で構成される構造としてローカル時刻を表すことができます。 Parquet と Spark では、このような構造はサポートされていません。 代わりに、UTC (または指定されていないタイム ゾーン) で時刻を記録できる TIMESTAMP データ型が使われます。

Spark CDM コネクタでは CSV の DateTimeOffset 値が UTC タイム スタンプに変換されます。 この値はタイム スタンプとして Parquet で保存されます。 この値が後で CSV に保存される場合、+00:00 のオフセットで DateTimeOffset 値としてシリアル化されます。 時間的な正確性は失われません。 シリアル化された値は元の値と同じ時刻を表しますが、オフセットは失われます。

Spark システムは、システム時刻をベースラインとして使い、通常はそのローカル時刻を使って時刻を表します。 UTC 時刻は常に、ローカル システム オフセットの適用によって計算できます。 すべてのリージョンの Azure システムでは、システム時刻は常に UTC であるため、すべてのタイム スタンプ値はふつう UTC になります。 Common Data Model 定義がデータフレームから導出される暗黙的な書き込みを使用している場合、タイム スタンプ列は Common Data Model の DateTime データ型の属性に変換されます。これは UTC 時刻を意味します。

ローカル時刻を保持し、データを Spark で処理したり Parquet で保存したりすることが重要な場合は、DateTime 属性を使用し、オフセットを別の属性で保持することをお勧めします。 たとえば、分を表す符号付き整数値としてオフセットを保持できます。 Common Data Model では、DateTime 値は UTC であるため、オフセットを適用してローカル時刻を計算する必要があります。

ほとんどの場合、ローカル時刻の保持は重要ではありません。 多くの場合、ローカル時刻は、ユーザーの便宜のために UI でのみ必要であり、ユーザーのタイム ゾーンに基づくため、多くの場合、UTC 時刻を格納しない方が、より適切なソリューションになります。

Common Data Model の時刻データの処理

Spark では、明示的な Time データ型はサポートされません。 Common Data Model の Time データ型の属性は、Spark データフレームでは、Timestamp データ型を持つ列として表されます。 Spark CDM コネクタで時刻値が読み取られるとき、データフレーム内のタイム スタンプは、Spark エポック日付 01/01/1970 と、ソースから読み取られた時刻値を使って初期化されます。

明示的な書き込みを使用する場合は、タイム スタンプ列を DateTime または Time 属性にマップできます。 タイム スタンプを Time 属性にマップすると、タイム スタンプの日付部分は削除されます。

暗黙的な書き込みを使うと、タイム スタンプ列は既定で DateTime 属性にマップされます。 タイム スタンプ列を Time 属性にマップするには、タイム スタンプを Time 値として解釈する必要があることを示すメタデータ オブジェクトを、データフレームの列に追加する必要があります。 次のコードは、Scala でこれを行う方法を示しています。

val md = new MetadataBuilder().putString(“dataType”, “Time”)
val schema = StructType(List(
StructField(“ATimeColumn”, TimeStampType, true, md))

時刻の値の精度

Spark CDM コネクタでは、DateTime または Time のいずれかで時間値がサポートされます。 秒は、読み取られるファイル内のデータの形式 (CSV または Parquet) またはデータフレームで定義されている形式に基づいて、最大で小数点以下 6 桁です。 小数点以下 6 桁を使用すると、1 秒からマイクロ秒までの精度を実現できます。

フォルダーとファイルの名前付けと編成

Common Data Model フォルダーに書き込む場合は、既定のフォルダー編成があります。 既定では、データ ファイルは現在の日付で作成されたフォルダーに書き込まれます (2010-07-31 など)。 dateFolderFormat オプションを使用して、フォルダー構造と名前をカスタマイズできます。

データ ファイルの名前は、<エンティティ>-<ジョブ ID>-*.<ファイル形式> というパターンに基づいています。

sparkContext.parallelize() メソッドを使用して、書き込まれるデータ パーティションの数を制御できます。 パーティションの数は、Spark クラスターでの Executor の数によって決定されるか、明示的に指定されます。 次の Scala の例では、2 つのパーティションを持つデータフレームが作成されます。

val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)

参照先エンティティ定義によって定義される明示的な書き込みの例を次に示します。

+-- <CDMFolder>
     |-- default.manifest.cdm.json     << with entity reference and partition info
     +-- <Entity>
          |-- <entity>.cdm.json        << resolved physical entity definition
          |-- <data folder>
          |-- <data folder>
          +-- ...                            

サブマニフェストを使用した明示的な書き込みの例を次に示します。

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
         |-- <entity>.cdm.json
         |-- <entity>.manifest.cdm.json << submanifest with partition info
         |-- <data folder>
         |-- <data folder>
         +-- ...

エンティティ定義がデータフレーム スキーマから導出される暗黙的な書き込みの例を次に示します。

+-- <CDMFolder>
    |-- default.manifest.cdm.json
    +-- <Entity>
         |-- <entity>.cdm.json          << resolved physical entity definition
         +-- LogicalDefinition
         |   +-- <entity>.cdm.json      << logical entity definitions
         |-- <data folder>
         |-- <data folder>
         +-- ...

サブマニフェストを使用した暗黙的な書き込みの例を次に示します。

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
        |-- <entity>.cdm.json           << resolved physical entity definition
        |-- <entity>.manifest.cdm.json  << submanifest with reference to the entity and partition info
        +-- LogicalDefinition
        |   +-- <entity>.cdm.json       << logical entity definitions
        |-- <data folder>
        |-- <data folder>
        +-- ...

トラブルシューティングと既知の問題

  • データフレームで使用する 10 進データ型フィールドの有効桁数と小数点以下桁数が、Common Data Model エンティティ定義内のデータ型と一致していることを確認してください。 Common Data Model で有効桁数と小数点以下桁数が明示的に定義されていない場合、既定値は Decimal(18,4) です。 model.json ファイルの場合、DecimalDecimal(18,4) と見なされます。
  • オプション manifestPathentityDefinitionModelRootentityDefinitionPathdataFolderFormat のフォルダー名とファイル名には、スペースや等号 (=) などの特殊文字を含めることはできません。

次の手順

他の Apache Spark コネクタを確認できます。