Delta Live Tables パイプラインを監視する

この記事では、Delta Live Tables パイプラインに組み込みの監視機能と監視機能を使用する方法について説明します。 これらの機能では、次のようなタスクがサポートされます。

パイプライン イベントの通知を追加する

次の場合に通知を受信するように 1 つ以上のメール アドレスを構成できます:

  • パイプラインの更新が正常に完了しました。
  • パイプラインの更新は、再試行可能または再試行不可能なエラーで失敗します。 すべてのパイプラインの失敗について通知を受け取るには、このオプションを選択します。
  • パイプラインの更新が失敗し、再試行できない (致命的な) エラーが発生します。 再試行不可能なエラーが発生した場合にのみ通知を受け取るには、このオプションを選択します。
  • 1 つのデータ フローが失敗します。

パイプラインを 作成 または編集するときに電子メール通知を構成するには:

  1. [通知の追加] をクリックします。
  2. 通知を受信する 1 つ以上のメール アドレスを入力します。
  3. 各通知の種類のチェック ボックスをオンにして、構成済みのメール アドレスに送信します。
  4. [通知の追加] をクリックします。

UI で使用できるパイプラインの詳細は何ですか?

パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 既定では、パイプラインの詳細ページにテーブルの最新の更新が表示されますが、ドロップダウン メニューからは古い更新を選択できます。

詳細には、パイプライン ID、ソース コード、コンピューティング コスト、製品エディション、パイプライン用に構成されたチャネルが含まれます。

データセットの表形式ビューを表示するには、[リスト] タブをクリックします。リスト ビューを使用すると、パイプライン内のすべてのデータセットをテーブルの行として表示できます。これは、パイプライン DAG が大きすぎて、グラフ ビューに視覚化できない場合に便利です。 データセット名、型、状態などの複数のフィルターを使用して、テーブルに表示されるデータセットを制御できます。 もう一度 DAG の視覚化に切り替えるには、[グラフ] をクリックします。

[実行するアカウント名] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。 run as ユーザーを変更するには、[アクセス許可] をクリックしてパイプラインの所有者を変更します。

データセットの詳細を表示するにはどうすればよいですか?

パイプラインのグラフまたはデータセットの一覧でデータセットをクリックすると、そのデータセットに関する詳細が表示されます。 詳細には、データセット スキーマ、データ品質メトリック、データセットを定義するソース コードへのリンクが含まれます。

更新履歴を表示する

パイプラインの更新の履歴と状態を表示するには、上部バーにある [更新履歴] ドロップダウン メニューをクリックします。

ドロップダウン メニューで更新プログラムを選択して、更新プログラムのグラフ、詳細、イベントを表示します。 最新の更新に戻るには、[Show the latest update] (最新の更新を表示) をクリックします。

Delta Live Tables イベント ログのクエリ実行

デルタ ライブ テーブル イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データ系統など、パイプラインに関連するすべての情報が含まれます。 イベント ログを使用して、データ パイプラインの状態を追跡、把握、および監視することができます。

デルタ ライブ テーブル ユーザー インターフェイス、デルタ ライブ テーブル API で、またはイベント ログに直接クエリを実行することによって、イベント ログ エントリを表示できます。 このセクションでは、イベント ログに直接クエリを実行することに重点を置いています。

また、イベント フックを使用してイベント (例: アラートの送信) がログに記録されると、実行するカスタム アクションを定義することもできます。

イベント ログのスキーマ

次の表で、イベント ログのスキーマについて説明します。 フィールドの一部に、details フィールドなど、いくつかのクエリを実行するために解析を必要とする JSON データが含まれています。 Azure Databricks では、JSON フィールドを解析するための : 演算子がサポートされています。 : (コロン記号) 演算子を参照してください。

フィールド 説明
id イベント ログ レコードの一意識別子。
sequence イベントを識別および順序付けるためのメタデータを含む JSON ドキュメント。
origin パイプラインがどこに作成されたか (DBSQL または WORKSPACE) を示す、クラウド プロバイダー、クラウド プロバイダー リージョン、user_idpipeline_idpipeline_type など、イベントの発生元のメタデータを含む JSON ドキュメント。
timestamp イベントが記録された時刻。
message 人が判読できる、イベントを説明するメッセージ。
level イベントの種類 (INFOWARNERRORMETRICS など)。
error エラーが発生した場合の、エラーの詳細説明。
details イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用される主なフィールドです。
event_type イベントの種類。
maturity_level イベント スキーマの安定性。 指定できる値は次のとおりです。

- STABLE: このスキーマは安定しており、変更されません。
- NULL: このスキーマは安定しており、変更されません。 maturity_level フィールドが追加される前にレコードが作成された場合、この値は NULL になる可能性があります (リリース 2022.37)。
- EVOLVING: このスキーマは安定していないため、変更される可能性があります。
- DEPRECATED: このスキーマは非推奨であり、Delta Live Tables ランタイムはいつでもこのイベントの生成を停止することができます。

イベント ログへのクエリ実行

イベント ログの場所とイベント ログのクエリを実行するインターフェイスは、パイプラインで Hive メタストアと Unity カタログのどちらを使用するように構成されているかによって異なります。

Hive metastore

パイプラインが Hive メタストアにテーブルを発行する場合、イベント ログは storage の場所の下の /system/events に格納されます。 たとえば、パイプライン storage の設定を /Users/username/data と構成した場合、イベン トログは DBFS の /Users/username/data/system/events パスに格納されます。

storage 設定を構成していない場合、既定のイベント ログの場所は DBFS の /pipelines/<pipeline-id>/system/events になります。 たとえば、パイプラインの ID が 91de5e48-35ed-11ec-8d3d-0242ac130003 の場合、格納場所は /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events となります。

ビューを作成して、イベント ログのクエリを簡素化できます。 次の例では、event_log_raw という一時ビューを作成します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

<event-log-path> をイベント ログの場所に置き換えます。

パイプライン実行の各インスタンスは、"更新" と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリを実行して最新の更新プログラムの識別子を検索し、latest_update_id 一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Azure Databricks ノートブックまたは SQL エディターでイベント ログにクエリを実行できます。 ノートブックまたは SQL エディターを使用して、イベント ログ クエリの例を実行します。

Unity Catalog

パイプラインでテーブルを Unity Catalog に公開する場合は、event_log テーブル値関数 (TVF) を使用してパイプラインのイベント ログをフェッチする必要があります。 パイプラインのイベント ログを取得するには、パイプライン ID またはテーブル名を TVF に渡します。 たとえば、ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b を持つパイプライン向けのイベント ログ レコードを取得するには、次のようにします。

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

テーブル my_catalog.my_schema.table1 を作成または所有するパイプラインのイベント ログ レコードを取得するには、次のようにします。

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

TVF を呼び出すには、共有クラスターまたは SQL ウェアハウスを使用する必要があります。 たとえば、共有クラスターにアタッチされたノートブックを使用したり、SQL ウェアハウスに接続されている SQL エディター を使用したりすることができます。

パイプラインに対するイベントのクエリを簡略化するために、パイプラインの所有者は event_log TVF 経由でビューを作成できます。 次の例では、パイプラインのイベント ログ経由でビューを作成します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

Note

event_log TVF はパイプライン所有者のみが呼び出すことができ、event_log TVF 経由で作成されたビューは、パイプライン所有者のみがクエリを実行できます。 ビューを他のユーザーと共有することはできません。

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

<pipeline-ID> を Delta Live Tables パイプラインの一意識別子に置き換えます。 ID は、Delta Live Tables UI の [パイプラインの詳細] パネルで確認できます。

パイプライン実行の各インスタンスは、"更新" と呼ばれます。 多くの場合、最新の更新プログラムの情報を抽出する必要があります。 次のクエリを実行して最新の更新プログラムの識別子を検索し、latest_update_id 一時ビューに保存します。 このビューは、以下の記事に含まれるイベント ログ クエリの例で使用されます。

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

イベント ログに系列情報のクエリを実行する

データ系列に関する情報を含むイベントには、イベントの種類 flow_definition があります。 details:flow_definition オブジェクトには、グラフ内の各リレーションシップを定義する output_datasetinput_datasets が含まれます。

次のクエリを使用して、入力データセットと出力データセットを抽出して系列情報を表示できます。

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

イベント ログにデータ品質のクエリを実行する

パイプラインのデータセットで期待値を定義すると、データ品質メトリックが details:flow_progress.data_quality.expectations オブジェクトに格納されます。 データ品質に関する情報を含むイベントには、イベントの種類 flow_progress があります。 次の例では、最後に行ったパイプラインの更新のデータ品質メトリックに対してクエリを実行します。

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

ログにクエリを実行してデータ バックログを監視する

Delta Live Tables は、details:flow_progress.metrics.backlog_bytes オブジェクトのバックログに存在するデータの量を追跡します。 バックログ メトリックを含むイベントには、イベントの種類 flow_progress があります。 次の例では、最新のパイプラインの更新のバックログ メトリックに対してクエリを実行します。

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

注意

パイプラインのデータ ソースの種類と Databricks Runtime のバージョンによっては、バックログ メトリックを使用できない場合があります。

サーバーレスが有効になっていないパイプラインのイベント ログからの拡張自動スケール イベントを監視する

サーバーレス コンピューティングを使用しない DLT パイプラインの場合、パイプラインで拡張自動スケールが有効になっている場合、イベント ログはクラスターのサイズ変更をキャプチャします。 拡張自動スケールに関する情報を含むイベントには、イベントの種類が autoscale。 クラスターのサイズ変更要求の情報は、details:autoscale オブジェクトに格納されます。 次の例では、最後のパイプライン更新に対する拡張自動スケール クラスターのサイズ変更要求を照会します。

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

リソース使用率を監視する

cluster_resources イベントでは、クラスター内のタスク スロットの数、それらのタスク スロットの使用率、スケジュールを待機しているタスクの数に関するメトリックが提供されます。

拡張自動スケールが有効になっている場合、 cluster_resources イベントには、 latest_requested_num_executorsoptimal_num_executorsなど、自動スケール アルゴリズムのメトリックも含まれます。 イベントには、アルゴリズムの状態も、CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION などのさまざまな状態として表示されます。 この情報は、自動スケール イベントと組み合わせて表示して、拡張自動スケールの全体像を提供できます。

次の例では、最後のパイプライン更新のタスク キュー サイズ履歴に対してクエリを実行します。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、最後のパイプライン更新の使用率の履歴に対してクエリを実行します。

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

次の例では、強化された自動スケール パイプラインでのみ使用できるメトリックと共に、最新の要求でアルゴリズムによって要求された Executor の数、最新のメトリックに基づいてアルゴリズムによって推奨される Executor の最適な数、自動スケール アルゴリズムの状態など、Executor カウント履歴を照会します。

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Delta Live Tables パイプラインの監査を行う

Delta Live Tables イベント ログ レコードやその他の Azure Databricks 監査ログを使用して、Delta Live Tables でどのようにデータが更新されているのかという全体像を把握できます。

デルタ ライブ テーブルは、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用される認証情報を変更できます。 Delta Live Tables では、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインに対してアクションをするユーザーが記録されます。

Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog イベント」を参照してください。

イベント ログにユーザー アクションのクエリを実行する

イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザー アクションに関する情報を含むイベントには、イベントの種類 user_action があります。

アクションに関する情報は、details フィールドの user_action オブジェクトに格納されます。 次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用される event_log_raw ビューを作成するには、「イベント ログのクエリ実行」を参照してください。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

ランタイム情報

パイプライン更新のランタイム情報 (更新の Databricks Runtime バージョンなど) を表示できます。

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0