From 00c14c0b7e48ff552bc3798d2dcaa724fe7952b8 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 12 Jul 2022 06:23:54 +0200 Subject: [PATCH] Fix #5336 - Add startTime, endTime and logLink to taskStatus (#5969) Fix #5336 - Add startTime, endTime and logLink to taskStatus (#5969) --- .../json/schema/entity/data/pipeline.json | 16 +++++++++++++++- .../lineage/utils.py | 5 ++++- .../ingestion/source/pipeline/airbyte.py | 17 +++++++++++++++-- .../ingestion/source/pipeline/airflow.py | 4 ++++ 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json index 8b73eb5aa13..a836fcd7dfe 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/pipeline.json @@ -36,9 +36,23 @@ "executionStatus": { "description": "Status at a specific execution date.", "$ref": "#/definitions/statusType" + }, + "startTime": { + "description": "Task start time", + "$ref": "../../type/basic.json#/definitions/timestamp" + }, + "endTime": { + "description": "Task end time", + "$ref": "../../type/basic.json#/definitions/timestamp" + }, + "logLink": { + "description": "Task end time", + "type": "string", + "format": "uri" } }, - "additionalProperties": false + "additionalProperties": false, + "required": ["name", "executionStatus"] }, "task": { "type": "object", diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index b968ec70e83..748d602b6dc 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -292,7 +292,10 @@ def add_status( updated_task_status = [ TaskStatus( name=task_instance.task_id, - executionStatus=_STATUS_MAP.get(context["task_instance"].state), + executionStatus=_STATUS_MAP.get(task_instance.state), + startTime=datetime_to_ts(task_instance.start_date), + endTime=datetime_to_ts(task_instance.end_date), + logLink=task_instance.log_url, ), *task_status, ] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py index 44b7da85a41..f4c5d47aa6d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -150,21 +150,32 @@ class AirbyteSource(Source[CreatePipelineRequest]): ) def fetch_pipeline_status( - self, connection: dict, pipeline_fqn: str + self, workspace: dict, connection: dict, pipeline_fqn: str ) -> OMetaPipelineStatus: """ Method to get task & pipeline status """ + + # Airbyte does not offer specific attempt link, just at pipeline level + log_link = ( + f"{self.service_connection.hostPort}/workspaces/{workspace.get('workspaceId')}/connections/" + f"{connection.get('connectionId')}/status" + ) + for job in self.client.list_jobs(connection.get("connectionId")): if not job or not job.get("attempts"): continue for attempt in job["attempts"]: + task_status = [ TaskStatus( name=str(connection.get("connectionId")), executionStatus=STATUS_MAP.get( attempt["status"].lower(), StatusType.Pending ).value, + startTime=attempt.get("createdAt"), + endTime=attempt.get("endedAt"), + logLink=log_link, ) ] pipeline_status = PipelineStatus( @@ -259,7 +270,9 @@ class AirbyteSource(Source[CreatePipelineRequest]): service_name=self.service.name.__root__, pipeline_name=connection.get("connectionId"), ) - yield from self.fetch_pipeline_status(connection, pipeline_fqn) + yield from self.fetch_pipeline_status( + workspace, connection, pipeline_fqn + ) if self.source_config.includeLineage: pipeline_entity: Pipeline = self.metadata.get_by_name( entity=Pipeline, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 74da3c71d2c..764fa01abc9 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -60,6 +60,7 @@ from metadata.utils.connections import ( test_connection, ) from metadata.utils.filters import filter_by_pipeline +from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -165,6 +166,9 @@ class AirflowSource(Source[CreatePipelineRequest]): executionStatus=STATUS_MAP.get( task.state, StatusType.Pending.value ), + startTime=datetime_to_ts(task.start_date), + endTime=datetime_to_ts(task.end_date), + logLink=task.log_url, ) for task in tasks ]