Airflow タスク ログを使用したマニフェスト インジェストの問題のトラブルシューティング

この記事は、Airflow タスク ログを使用して、Azure Data Manager for Energy でのマニフェスト インジェストに関するワークフローの問題のトラブルシューティングに役立ちます。

マニフェスト インジェスト DAG ワークフローの種類

マニフェスト インジェストには、単一マニフェストとバッチ アップロードの 2 種類の有向非巡回グラフ (DAG) ワークフローがあります。

単一マニフェスト

1 つのマニフェスト ファイルを使用して、マニフェスト インジェスト ワークフローをトリガーします。

DagTaskName 値 説明
update_status_running_task ワークフロー サービスを呼び出し、データベースで DAG の状態を running とマークします。
check_payload_type インジェストの種類がバッチ マニフェストか単一マニフェストかを検証します。
validate_manifest_schema_task マニフェストに記載されているすべてのスキーマ型が存在し、参照スキーマの整合性があることを確認します。 無効な値はすべてマニフェストから削除されます。
provide_manifest_intergrity_task OSDU® R3 マニフェスト内の参照を検証し、無効なエンティティを削除します。 この演算子は、親/子の検証を担当します。 孤立したエンティティはすべてログに記録され、検証されたマニフェストから除外されます。 参照先の外部レコードが検索されます。 何も見つからない場合、マニフェスト エンティティは削除されます。 すべての代理キー参照も解決されます。
process_single_manifest_file_task 前の手順から取得した最終的なマニフェスト エンティティのインジェストを実行します。 データ レコードは、ストレージ サービスを介して取り込まれます。
update_status_finished_task ワークフロー サービスを呼び出し、データベースで DAG の状態を finished または failed とマークします。

バッチ アップロード

複数のマニフェスト ファイルが同じワークフロー サービス要求の一部です。 要求ペイロードのマニフェスト セクションは、項目の辞書ではなくリストです。

DagTaskName 値 説明
update_status_running_task ワークフロー サービスを呼び出し、データベースで DAG の状態を running とマークします。
check_payload_type インジェストの種類がバッチ マニフェストか単一マニフェストかを検証します。
batch_upload マニフェストの一覧を 3 つのバッチに分割し、並列で処理します。 (タスク ログは出力されません)。
process_manifest_task_(1 / 2 / 3) マニフェストの一覧を 3 つのグループに分割し、処理します。 validate_manifest_schema_taskprovide_manifest_intergrity_taskprocess_single_manifest_file_task で実行されるすべてのステップは圧縮され、これらのタスクで順番に実行されます。
update_status_finished_task ワークフロー サービスを呼び出し、データベースで DAG の状態を finished または failed とマークします。

ペイロードの種類 (単一またはバッチ) に基づいて、check_payload_type タスクは適切なブランチを選択し、もう一方のブランチのタスクをスキップします。

前提条件

Airflow タスク ログを Azure Monitor と統合する必要があります。 「Airflow ログと Azure Monitor を統合する方法」を参照してください。

次の列は、問題をデバッグするために、エアフロー タスク ログで公開されています。

パラメーター名 説明
RunID トリガーされた DAG 実行の一意の実行 ID。
CorrelationID DAG 実行の一意の関連付け ID (実行 ID と同じ)。
DagName DAG ワークフロー名。 たとえば、Osdu_ingest はマニフェスト インジェストのワークフロー名です。
DagTaskName DAG ワークフローのタスク名。 たとえば、update_status_running_task はマニフェスト インジェストのタスク名です。
Content タスクの実行中に Airflow が出力するエラー ログ メッセージ (エラーまたは例外)。
LogTimeStamp DAG の実行の期間。
LogLevel エラーのレベル。 値は DEBUGINFOWARNINGERROR です。 ほとんどの例外とエラー メッセージは、ERROR レベルでフィルター処理することで確認できます。

失敗した DAG の実行

ワークフローの実行が Update_status_running_task または Update_status_finished_task で失敗し、データ レコードが取り込まれませんでした。

考えられる原因

  • データ パーティション ID が正しくなかったため、パーティション API の呼び出しが認証されなかった。
  • 要求本文の実行コンテキストのキー名が正しくない。
  • ワークフロー サービスが実行されていないか、5xx エラーがスローされている。

ワークフローの状態

ワークフローの状態は、failed とマークされます。

解決策

update_status_running_task または update_status_finished_task がないか Airflow タスク ログを確認します。 正しいデータ パーティション ID またはキー名を渡してペイロードを修正します。

サンプル Kusto クエリ:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

サンプル トレース出力:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

スキーマの検証に失敗

スキーマの検証に失敗したため、レコードが取り込まれませんでした。

考えられる原因

  • スキーマ サービスが "スキーマが見つかりません" というエラーをスローしている。
  • マニフェスト本文がスキーマ型に準拠していない。
  • スキーマ参照が正しくない。
  • スキーマ サービスが 5xx エラーをスローしている。

ワークフローの状態

ワークフローの状態は、finished とマークされます。 無効なエンティティがスキップされ、インジェストが続行されるため、ワークフローの状態でエラーが発生することはありません。

解決策

validate_manifest_schema_task または process_manifest_task がないか Airflow タスク ログを確認します。 正しいデータ パーティション ID またはキー名を渡してペイロードを修正します。

サンプル Kusto クエリ:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

サンプル トレース出力:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

参照チェックの失敗

参照チェックが失敗したため、レコードが取り込まれませんでした。

考えられる原因

  • 参照されたレコードが見つからなかった。
  • 親レコードが見つからなかった。
  • 検索サービスが 5xx エラーをスローしている。

ワークフローの状態

ワークフローの状態は、finished とマークされます。 無効なエンティティがスキップされ、インジェストが続行されるため、ワークフローの状態でエラーが発生することはありません。

解決策

provide_manifest_integrity_task または process_manifest_task がないか Airflow タスク ログを確認します。

サンプル Kusto クエリ:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

参照整合性タスク専用のエラー ログがないため、デバッグ ログ ステートメントを調べて、すべての外部レコードが検索サービスを介してフェッチされたかどうかを確認します。

たとえば、次のサンプル トレース出力は、参照整合性のために検索サービスを介して照会されたレコードを示しています。

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

出力には、システムで取得され、存在していたレコードが表示されます。 レコードの一部が存在しない場合は、レコードを参照していた関連するマニフェスト オブジェクトは削除され、取り込まれなくなります。

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

マニフェストに無効な法的タグまたはアクセス制御リスト (ACL) が含まれているため、レコードは取り込まれませんでした。

考えられる原因

  • ACL が正しくない。
  • 法的タグが正しくない。
  • ストレージ サービスが 5xx エラーをスローしている。

ワークフローの状態

ワークフローの状態は、finished とマークされます。 ワークフローの状態にエラーがありません。

解決策

process_single_manifest_file_task または process_manifest_task がないか Airflow タスク ログを確認します。

サンプル Kusto クエリ:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

サンプル トレース出力:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

出力は、取得されたレコードを示します。 不足している検索レコードに対応するマニフェスト エンティティ レコードは削除され、取り込まれません。

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

既知の問題

  • 参照整合性タスクの特定のエラー ログがないため、デバッグ ログ ステートメントを手動で検索して、すべての外部レコードが検索サービスを介して取得されたかどうかを確認する必要があります。

次のステップ

次のチュートリアルに進み、マニフェスト ベースのファイル インジェストを実行する方法を学習します。

関連情報