Устранение неполадок приема манифеста с помощью журналов задач Airflow

В этой статье показано, как устранить неполадки рабочего процесса с приемом манифеста в Azure Data Manager для энергетики с помощью журналов задач Airflow.

Типы рабочих процессов DAG приема манифестов

Существует два типа рабочих процессов ациклического графа (DAG) для приема манифеста: один манифест и пакетная отправка.

Один манифест

Один файл манифеста используется для активации рабочего процесса приема манифеста.

Значение DagTaskName Description
update_status_running_task Вызывает службу рабочего процесса и помечает состояние DAG как running в базе данных.
check_payload_type Проверяет, является ли тип приема пакетом или одним манифестом.
validate_manifest_schema_task Убедитесь, что все типы схем, упоминание в манифесте, присутствуют и есть целостность ссылочной схемы. Все недопустимые значения вытеснили из манифеста.
provide_manifest_intergrity_task Проверяет ссылки внутри манифеста R3 OSDU® и удаляет недопустимые сущности. Этот оператор отвечает за проверку родительского или дочернего объекта. Все дочерние сущности регистрируются и исключаются из проверенного манифеста. Поиск всех внешних записей, на которые ссылается ссылка. Если нет, сущность манифеста удаляется. Все суррогатные ссылки на ключи также разрешаются.
process_single_manifest_file_task Выполняет прием конечных сущностей манифеста, полученных на предыдущем шаге. Записи данных передаются через службу хранилища.
update_status_finished_task Вызывает службу рабочего процесса и помечает состояние DAG как finished или failed в базе данных.

Пакетная отправка

Несколько файлов манифеста являются частью одного запроса службы рабочего процесса. Раздел манифеста в полезных данных запроса — это список вместо словаря элементов.

Значение DagTaskName Description
update_status_running_task Вызывает службу рабочего процесса и помечает состояние DAG как running в базе данных.
check_payload_type Проверяет, является ли тип приема пакетом или одним манифестом.
batch_upload Делит список манифестов на три пакета, которые будут обрабатываться параллельно. (Журналы задач не создаются.)
process_manifest_task_(1 / 2 / 3) Делит список манифестов на группы из трех и обрабатывает их. Все шаги, выполняемые в validate_manifest_schema_task, provide_manifest_intergrity_taskи process_single_manifest_file_task сжаты и выполняются последовательно в этих задачах.
update_status_finished_task Вызывает службу рабочего процесса и помечает состояние DAG как finished или failed в базе данных.

В зависимости от типа полезных данных (один или пакет), check_payload_type задача выбирает соответствующую ветвь и пропускает задачи в другой ветви.

Необходимые компоненты

Необходимо интегрировать журналы задач Airflow с Azure Monitor. См. статью "Интеграция журналов Airflow с Azure Monitor".

Следующие столбцы предоставляются в журналах задач Airflow для отладки проблемы:

Наименование параметра Description
RunID Уникальный идентификатор запуска триггера DAG.
CorrelationID Уникальный идентификатор корреляции запуска DAG (так же, как и идентификатор выполнения).
DagName Имя рабочего процесса DAG. Например, Osdu_ingest это имя рабочего процесса для приема манифеста.
DagTaskName Имя задачи для рабочего процесса DAG. Например, update_status_running_task это имя задачи для приема манифеста.
Content Сообщения журнала ошибок (ошибки или исключения), которые Airflow выдает во время выполнения задачи.
LogTimeStamp Интервал времени выполнения DAG.
LogLevel Уровень ошибки. Значения: DEBUG, INFOи WARNINGERROR. Вы можете просматривать большинство сообщений об исключениях и ошибках, отфильтровав их на ERROR уровне.

Сбой запуска DAG

Сбой Update_status_running_task выполнения рабочего процесса или Update_status_finished_taskзаписи данных не были приема.

Возможные причины

  • Вызов API секционирования не прошел проверку подлинности, так как идентификатор секции данных является неверным.
  • Неверное имя ключа в контексте выполнения текста запроса.
  • Служба рабочего процесса не выполняется или вызывает ошибки 5xx.

Состояние workflow-процесса

Состояние рабочего процесса помечается как failed.

Решение

Проверьте журналы задач Airflow для update_status_running_task или update_status_finished_task. Исправьте полезные данные, передав правильный идентификатор секции данных или имя ключа.

Пример запроса Kusto:

    OEPAirFlowTask
        | where DagName == "Osdu_ingest"
        | where DagTaskName == "update_status_running_task"
        | where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
        | where RunID == '<run_id>'

Пример выходных данных трассировки:

    [2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
        data_partition_id = ctx_payload['data-partition-id']
    KeyError: 'data-partition-id'
    
    requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264

Не удалось проверить схему

Записи не были приема, так как проверка схемы завершилась ошибкой.

Возможные причины

  • Служба схемы вызывает ошибки "Схема не найдена".
  • Текст манифеста не соответствует типу схемы.
  • Ссылки на схему неверны.
  • Служба схемы выдает ошибки 5xx.

Состояние workflow-процесса

Состояние рабочего процесса помечается как finished. Не наблюдается сбой в состоянии рабочего процесса, так как недопустимые сущности пропускаются и прием продолжается.

Решение

Проверьте журналы задач Airflow для validate_manifest_schema_task или process_manifest_task. Исправьте полезные данные, передав правильный идентификатор секции данных или имя ключа.

Пример запроса Kusto:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID == "<run_id>"
    | order by ['time'] asc  

Пример выходных данных трассировки:

    Error traces to look out for
    [2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
    [2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
    [2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
    [2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
    [2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
    [2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
    [2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
    
    Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
        {'description': 'The stop value/last value of the ReferenceCurveID, '
                        'typically the end depth of the logging.',
         'example': 7500,
         'title': 'Sampling Stop',
         'type': 'number',
         'x-osdu-frame-of-reference': 'UOM'}
    
    On instance['data']['SamplingStop']:
        'string-value'

Не удалось проверка ссылок

Записи не были приема, так как сбой ссылок проверка.

Возможные причины

  • Ссылки на записи не найдены.
  • Родительские записи не найдены.
  • Служба поиска выдает ошибки 5xx.

Состояние workflow-процесса

Состояние рабочего процесса помечается как finished. Не наблюдается сбой в состоянии рабочего процесса, так как недопустимые сущности пропускаются и прием продолжается.

Решение

Проверьте журналы задач Airflow для provide_manifest_integrity_task или process_manifest_task.

Пример запроса Kusto:

    OEPAirFlowTask
        | where DagName has "Osdu_ingest"
        | where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
        | where Content has 'Search query "'or Content has 'response ids: ['
        | where RunID has "<run_id>"

Так как для задач целостности ссылок нет журналов ошибок, проверка инструкции журнала отладки, чтобы узнать, были ли все внешние записи извлекаются через службу поиска.

Например, в следующем примере выходных данных трассировки отображается запись, запрашиваемая через службу поиска для целостности ссылок:

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"

В выходных данных показаны записи, которые были получены и находились в системе. Связанный объект манифеста, на который ссылается запись, будет удален и больше не будет приемлен, если вы заметили, что некоторые записи не присутствуют.

    [2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a    ']

Записи не были приема, так как манифест содержит недопустимые юридические теги или списки управления доступом (ACL).

Возможные причины

  • Списки управления доступом неверны.
  • Неправильные юридические теги.
  • Служба хранилища выдает ошибки 5xx.

Состояние workflow-процесса

Состояние рабочего процесса помечается как finished. В состоянии рабочего процесса не наблюдается сбой.

Решение

Проверьте журналы задач Airflow для process_single_manifest_file_task или process_manifest_task.

Пример запроса Kusto:

    OEPAirFlowTask
    | where DagName has "Osdu_ingest"
    | where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
    | where LogLevel == "ERROR"
    | where RunID has "<run_id>"
    | order by ['time'] asc 

Пример выходных данных трассировки:

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
    

Выходные данные указывают на записи, полученные. Записи сущности манифеста, соответствующие отсутствующим записям поиска, удаляются и не обрабатываются.

    "PUT /api/storage/v2/records HTTP/1.1" 400 None
    [2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
    [2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb

Известные проблемы

  • Так как для задач целостности ссылок нет определенных журналов ошибок, необходимо вручную искать инструкции журнала отладки, чтобы узнать, были ли получены все внешние записи через службу поиска.

Следующие шаги

Перейдите к следующему руководству и узнайте, как выполнить прием файлов на основе манифеста:

Ссылки