Устранение неполадок приема манифеста с помощью журналов задач Airflow
В этой статье показано, как устранить неполадки рабочего процесса с приемом манифеста в Azure Data Manager для энергетики с помощью журналов задач Airflow.
Типы рабочих процессов DAG приема манифестов
Существует два типа рабочих процессов ациклического графа (DAG) для приема манифеста: один манифест и пакетная отправка.
Один манифест
Один файл манифеста используется для активации рабочего процесса приема манифеста.
Значение DagTaskName | Description |
---|---|
update_status_running_task |
Вызывает службу рабочего процесса и помечает состояние DAG как running в базе данных. |
check_payload_type |
Проверяет, является ли тип приема пакетом или одним манифестом. |
validate_manifest_schema_task |
Убедитесь, что все типы схем, упоминание в манифесте, присутствуют и есть целостность ссылочной схемы. Все недопустимые значения вытеснили из манифеста. |
provide_manifest_intergrity_task |
Проверяет ссылки внутри манифеста R3 OSDU® и удаляет недопустимые сущности. Этот оператор отвечает за проверку родительского или дочернего объекта. Все дочерние сущности регистрируются и исключаются из проверенного манифеста. Поиск всех внешних записей, на которые ссылается ссылка. Если нет, сущность манифеста удаляется. Все суррогатные ссылки на ключи также разрешаются. |
process_single_manifest_file_task |
Выполняет прием конечных сущностей манифеста, полученных на предыдущем шаге. Записи данных передаются через службу хранилища. |
update_status_finished_task |
Вызывает службу рабочего процесса и помечает состояние DAG как finished или failed в базе данных. |
Пакетная отправка
Несколько файлов манифеста являются частью одного запроса службы рабочего процесса. Раздел манифеста в полезных данных запроса — это список вместо словаря элементов.
Значение DagTaskName | Description |
---|---|
update_status_running_task |
Вызывает службу рабочего процесса и помечает состояние DAG как running в базе данных. |
check_payload_type |
Проверяет, является ли тип приема пакетом или одним манифестом. |
batch_upload |
Делит список манифестов на три пакета, которые будут обрабатываться параллельно. (Журналы задач не создаются.) |
process_manifest_task_(1 / 2 / 3) |
Делит список манифестов на группы из трех и обрабатывает их. Все шаги, выполняемые в validate_manifest_schema_task , provide_manifest_intergrity_task и process_single_manifest_file_task сжаты и выполняются последовательно в этих задачах. |
update_status_finished_task |
Вызывает службу рабочего процесса и помечает состояние DAG как finished или failed в базе данных. |
В зависимости от типа полезных данных (один или пакет), check_payload_type
задача выбирает соответствующую ветвь и пропускает задачи в другой ветви.
Необходимые компоненты
Необходимо интегрировать журналы задач Airflow с Azure Monitor. См. статью "Интеграция журналов Airflow с Azure Monitor".
Следующие столбцы предоставляются в журналах задач Airflow для отладки проблемы:
Наименование параметра | Description |
---|---|
RunID |
Уникальный идентификатор запуска триггера DAG. |
CorrelationID |
Уникальный идентификатор корреляции запуска DAG (так же, как и идентификатор выполнения). |
DagName |
Имя рабочего процесса DAG. Например, Osdu_ingest это имя рабочего процесса для приема манифеста. |
DagTaskName |
Имя задачи для рабочего процесса DAG. Например, update_status_running_task это имя задачи для приема манифеста. |
Content |
Сообщения журнала ошибок (ошибки или исключения), которые Airflow выдает во время выполнения задачи. |
LogTimeStamp |
Интервал времени выполнения DAG. |
LogLevel |
Уровень ошибки. Значения: DEBUG , INFO и WARNING ERROR . Вы можете просматривать большинство сообщений об исключениях и ошибках, отфильтровав их на ERROR уровне. |
Сбой запуска DAG
Сбой Update_status_running_task
выполнения рабочего процесса или Update_status_finished_task
записи данных не были приема.
Возможные причины
- Вызов API секционирования не прошел проверку подлинности, так как идентификатор секции данных является неверным.
- Неверное имя ключа в контексте выполнения текста запроса.
- Служба рабочего процесса не выполняется или вызывает ошибки 5xx.
Состояние workflow-процесса
Состояние рабочего процесса помечается как failed
.
Решение
Проверьте журналы задач Airflow для update_status_running_task
или update_status_finished_task
. Исправьте полезные данные, передав правильный идентификатор секции данных или имя ключа.
Пример запроса Kusto:
OEPAirFlowTask
| where DagName == "Osdu_ingest"
| where DagTaskName == "update_status_running_task"
| where LogLevel == "ERROR" // ERROR/DEBUG/INFO/WARNING
| where RunID == '<run_id>'
Пример выходных данных трассировки:
[2023-02-05, 12:21:54 IST] {taskinstance.py:1703} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/osdu_ingestion/libs/context.py", line 50, in populate
data_partition_id = ctx_payload['data-partition-id']
KeyError: 'data-partition-id'
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://contoso.energy.azure.com/api/workflow/v1/workflow/Osdu_ingest/workflowRun/e9a815f2-84f5-4513-9825-4d37ab291264
Не удалось проверить схему
Записи не были приема, так как проверка схемы завершилась ошибкой.
Возможные причины
- Служба схемы вызывает ошибки "Схема не найдена".
- Текст манифеста не соответствует типу схемы.
- Ссылки на схему неверны.
- Служба схемы выдает ошибки 5xx.
Состояние workflow-процесса
Состояние рабочего процесса помечается как finished
. Не наблюдается сбой в состоянии рабочего процесса, так как недопустимые сущности пропускаются и прием продолжается.
Решение
Проверьте журналы задач Airflow для validate_manifest_schema_task
или process_manifest_task
. Исправьте полезные данные, передав правильный идентификатор секции данных или имя ключа.
Пример запроса Kusto:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "validate_manifest_schema_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID == "<run_id>"
| order by ['time'] asc
Пример выходных данных трассировки:
Error traces to look out for
[2023-02-05, 14:55:37 IST] {connectionpool.py:452} DEBUG - https://contoso.energy.azure.com:443 "GET /api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0 HTTP/1.1" 404 None
[2023-02-05, 14:55:37 IST] {authorization.py:137} ERROR - {"error":{"code":404,"message":"Schema is not present","errors":[{"domain":"global","reason":"notFound","message":"Schema is not present"}]}}
[2023-02-05, 14:55:37 IST] {validate_schema.py:170} ERROR - Error on getting schema of kind 'osdu:wks:work-product-component--WellLog:2.2.0'
[2023-02-05, 14:55:37 IST] {validate_schema.py:171} ERROR - 404 Client Error: Not Found for url: https://contoso.energy.azure.com/api/schema-service/v1/schema/osdu:wks:work-product-component--WellLog:2.2.0
[2023-02-05, 14:55:37 IST] {validate_schema.py:314} WARNING - osdu:wks:work-product-component--WellLog:2.2.0 is not present in Schema service.
[2023-02-05, 15:01:23 IST] {validate_schema.py:322} ERROR - Schema validation error. Data field.
[2023-02-05, 15:01:23 IST] {validate_schema.py:323} ERROR - Manifest kind: osdu:wks:work-product-component--WellLog:1.1.0
[2023-02-05, 15:01:23 IST] {validate_schema.py:324} ERROR - Error: 'string-value' is not of type 'number'
Failed validating 'type' in schema['properties']['data']['allOf'][3]['properties']['SamplingStop']:
{'description': 'The stop value/last value of the ReferenceCurveID, '
'typically the end depth of the logging.',
'example': 7500,
'title': 'Sampling Stop',
'type': 'number',
'x-osdu-frame-of-reference': 'UOM'}
On instance['data']['SamplingStop']:
'string-value'
Не удалось проверка ссылок
Записи не были приема, так как сбой ссылок проверка.
Возможные причины
- Ссылки на записи не найдены.
- Родительские записи не найдены.
- Служба поиска выдает ошибки 5xx.
Состояние workflow-процесса
Состояние рабочего процесса помечается как finished
. Не наблюдается сбой в состоянии рабочего процесса, так как недопустимые сущности пропускаются и прием продолжается.
Решение
Проверьте журналы задач Airflow для provide_manifest_integrity_task
или process_manifest_task
.
Пример запроса Kusto:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "provide_manifest_integrity_task" or DagTaskName has "process_manifest_task"
| where Content has 'Search query "'or Content has 'response ids: ['
| where RunID has "<run_id>"
Так как для задач целостности ссылок нет журналов ошибок, проверка инструкции журнала отладки, чтобы узнать, были ли все внешние записи извлекаются через службу поиска.
Например, в следующем примере выходных данных трассировки отображается запись, запрашиваемая через службу поиска для целостности ссылок:
[2023-02-05, 19:14:40 IST] {search_record_ids.py:75} DEBUG - Search query "contoso-dp1:work-product-component--WellLog:5ab388ae0e140838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559" OR "contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a"
В выходных данных показаны записи, которые были получены и находились в системе. Связанный объект манифеста, на который ссылается запись, будет удален и больше не будет приемлен, если вы заметили, что некоторые записи не присутствуют.
[2023-02-05, 19:14:40 IST] {search_record_ids.py:141} DEBUG - response ids: ['contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a:1675590506723615', 'contoso-dp1:work-product-component--WellLog:5ab388ae0e1b40838c297f0e6559758a ']
Недопустимые юридические теги или списки управления доступом в манифесте
Записи не были приема, так как манифест содержит недопустимые юридические теги или списки управления доступом (ACL).
Возможные причины
- Списки управления доступом неверны.
- Неправильные юридические теги.
- Служба хранилища выдает ошибки 5xx.
Состояние workflow-процесса
Состояние рабочего процесса помечается как finished
. В состоянии рабочего процесса не наблюдается сбой.
Решение
Проверьте журналы задач Airflow для process_single_manifest_file_task
или process_manifest_task
.
Пример запроса Kusto:
OEPAirFlowTask
| where DagName has "Osdu_ingest"
| where DagTaskName == "process_single_manifest_file_task" or DagTaskName has "process_manifest_task"
| where LogLevel == "ERROR"
| where RunID has "<run_id>"
| order by ['time'] asc
Пример выходных данных трассировки:
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:57:05 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Invalid legal tags","message":"Invalid legal tags: contoso-dp1-R3FullManifest-Legal-Tag-Test779759112"}
Выходные данные указывают на записи, полученные. Записи сущности манифеста, соответствующие отсутствующим записям поиска, удаляются и не обрабатываются.
"PUT /api/storage/v2/records HTTP/1.1" 400 None
[2023-02-05, 16:58:46 IST] {authorization.py:137} ERROR - {"code":400,"reason":"Validation error.","message":"createOrUpdateRecords.records[0].acl: Invalid group name 'data1.default.viewers@contoso-dp1.dataservices.energy'"}
[2023-02-05, 16:58:46 IST] {single_manifest_processor.py:83} WARNING - Can't process entity SRN: surrogate-key:0ef20853-f26a-456f-b874-3f2f5f35b6fb
Известные проблемы
- Так как для задач целостности ссылок нет определенных журналов ошибок, необходимо вручную искать инструкции журнала отладки, чтобы узнать, были ли получены все внешние записи через службу поиска.
Следующие шаги
Перейдите к следующему руководству и узнайте, как выполнить прием файлов на основе манифеста: