データを変換する

この記事では、Azure Databricks を使ったデータ変換について紹介し、概要を説明します。 データの変換またはデータの準備は、すべての Data Engineering、分析、ML ワークロードにおける重要なステップです。

この記事のパターン例と推奨事項では、Delta Lake によってサポートされるレイクハウス テーブルの操作に焦点を当てています。 Delta Lake は Databricks レイクハウスの ACID 保証を提供するため、他の形式またはデータ システムのデータを操作する場合には、異なる動作が見られる可能性があります。

Databricks では、データを未加工または未加工に近い状態でレイクハウスに取り込み、別の処理ステップとして変換とエンリッチメントを適用することをお勧めしています。 このパターンはメダリオン アーキテクチャと呼ばれます。 「メダリオン レイクハウス アーキテクチャとは」を参照してください。

変換する必要があるデータがまだレイクハウスに読み込まれていないことがわかっている場合は、「Databricks レイクハウスにデータを取り込む」を参照してください。 変換を書き込むためのレイクハウス データを見つける場合は、「データの検出」を参照してください。

すべての変換は、データ ソースに対してバッチ クエリまたはストリーミング クエリを記述することから始まります。 データのクエリに慣れていない場合は、「データのクエリ」を参照してください。

変換されたデータを Delta テーブルに保存したら、そのテーブルを ML の特徴テーブルとして使用できます。 「特徴エンジニアリングとサービス提供」を参照してください。

Note

ここの記事では、Azure Databricks の変換について説明します。 Azure Databricks は、多くの一般的なデータ準備プラットフォームへの接続もサポートしています。 「Partner Connect を使ってデータ準備パートナーに接続する」を参照してください。

Spark 変換とレイクハウス変換

この記事では、ETL または ELT の T に関連する "変換 (transformation)" の定義に焦点を当てます。 Apache Spark 処理モデルでも、関連する意味で "変換" という単語が使われます。 簡単に言うと、Apache Spark では、すべての操作は変換またはアクションとして定義されます。

  • 変換: プランに処理ロジックを追加します。 たとえば、データの読み取り、結合、集計、型キャストなどがあります。
  • アクション: 結果を評価して出力する処理ロジックをトリガーします。 たとえば、書き込み、結果の表示またはプレビュー、手動キャッシュ、行数の取得などがあります。

Apache Spark は、"遅延実行" モデルを使います。これは、アクションがトリガーされるまで、操作のコレクションによって定義されたロジックはいずれも評価されないことを意味します。 このモデルには、データ処理パイプラインを定義するときに重要な影響があります。アクションは結果をターゲット テーブルに保存するためだけに使います。

アクションはロジックの最適化における処理のボトルネックとなるため、Azure Databricks では、ロジックの最適な実行を保証するために、Apache Spark に既に存在する最適化に加えて多数の最適化が追加されました。 これらの最適化では、特定のアクションによってトリガーされるすべての変換が一度に考慮され、データの物理レイアウトに基づいて最適なプランを見つけます。 運用パイプラインでデータを手動でキャッシュしたり、プレビュー結果を返したりすると、これらの最適化が中断され、コストと待機時間の大幅な増加につながる可能性があります。

そのため、"レイクハウス変換" を、新しいレイクハウス テーブルになる 1 つ以上のレイクハウス テーブルに適用される操作のコレクションとして定義できます。 結合や集計などの変換については個別に説明しますが、これらのパターンの多くを 1 つの処理ステップで組み合わせて、Azure Databricks のオプティマイザーに任せて最も効率的なプランを見つけることができることに注意してください。

ストリーミング処理とバッチ処理の違いは何ですか?

ストリーミング処理とバッチ処理は Azure Databricks で同じ構文の多くが使われますが、それぞれに固有のセマンティクスがあります。

バッチ処理を使うと、固定量の静的で変化しないデータを 1 回の操作として処理するための明示的な命令を定義できます。

ストリーム処理を使うと、無制限で継続的に増加するデータセットに対してクエリを定義し、小さな増分バッチでデータを処理できます。

Azure Databricks でのバッチ操作では Spark SQL または DataFrames が利用され、ストリーム処理では構造化ストリーミングが利用されます。

次の表に示すように、読み取りおよび書き込み操作を確認することで、バッチ Apache Spark コマンドと構造化ストリーミングを区別できます。

Apache Spark 構造化ストリーミング
読み取り spark.read.load() spark.readStream.load()
書き込み spark.write.save() spark.writeStream.start()

具体化されたビューは、一般にバッチ処理の保証に準拠していますが、可能な場合は Delta Live Tables を使って段階的に結果を計算します。 具体化されたビューによって返される結果は常にロジックのバッチ評価と同等ですが、Azure Databricks は可能な場合はこれらの結果を段階的に処理しようとします。

ストリーミング テーブルでは、常に結果が段階的に計算されます。 多くのストリーミング データ ソースはレコードを数時間または数日間しか保持しないため、ストリーミング テーブルで使われる処理モデルは、データ ソースからのレコードの各バッチが 1 回だけ処理されることを前提としています。

Azure Databricks は、次のユース ケースで SQL を使ったストリーミング クエリの作成をサポートしています。

  • Databricks SQL を使って Unity Catalog でストリーミング テーブルを定義する。
  • Delta Live Tables パイプラインのソース コードを定義する。

Note

Python 構造化ストリーミング コードを使って、Delta Live Tables でストリーミング テーブルを宣言することもできます。

バッチ変換

バッチ変換は、特定の時点で明確に定義されたデータ資産のセットに対して動作します。 バッチ変換は 1 回限りの操作である場合もありますが、多くの場合、運用システムを最新の状態に保つために定期的に実行される、スケジュールされたジョブまたはパイプラインの一部です。

増分変換

増分パターンでは、通常、データ ソースが追加専用であり、安定したスキーマがあることを前提としています。 次の記事では、更新、削除、またはスキーマ変更が発生するテーブルの増分変換の微妙な違いについて詳しく説明します。

リアルタイム変換

Delta Lake は、レイクハウスのクエリを実行するすべてのユーザーとアプリケーションに対し大量のデータへの準リアルタイムのアクセスを実現することに優れていますが、ファイルとメタデータをクラウド オブジェクト ストレージに書き込むためのオーバーヘッドのため、Delta Lake シンクに書き込む多くのワークロードでは真のリアルタイム待機時間に達することができません。

非常に低待機時間のストリーミング アプリケーションの場合、Databricks では、Kafka などのリアルタイム ワークロード向けに設計されたソース システムとシンク システムを選ぶことをお勧めします。 Azure Databricks を使うと、集計、ストリーム間の結合、レイクハウスに格納されているゆっくりと変化するディメンション データとストリーミング データの結合など、データをエンリッチすることができます。