Fix #5336 - Add startTime, endTime and logLink to taskStatus (#5969)

Fix #5336 - Add startTime, endTime and logLink to taskStatus (#5969)
This commit is contained in:
Pere Miquel Brull 2022-07-12 06:23:54 +02:00 committed by GitHub
parent 9da62be6f4
commit 00c14c0b7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 4 deletions

View File

@ -36,9 +36,23 @@
"executionStatus": {
"description": "Status at a specific execution date.",
"$ref": "#/definitions/statusType"
},
"startTime": {
"description": "Task start time",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"endTime": {
"description": "Task end time",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"logLink": {
"description": "Task end time",
"type": "string",
"format": "uri"
}
},
"additionalProperties": false
"additionalProperties": false,
"required": ["name", "executionStatus"]
},
"task": {
"type": "object",

View File

@ -292,7 +292,10 @@ def add_status(
updated_task_status = [
TaskStatus(
name=task_instance.task_id,
executionStatus=_STATUS_MAP.get(context["task_instance"].state),
executionStatus=_STATUS_MAP.get(task_instance.state),
startTime=datetime_to_ts(task_instance.start_date),
endTime=datetime_to_ts(task_instance.end_date),
logLink=task_instance.log_url,
),
*task_status,
]

View File

@ -150,21 +150,32 @@ class AirbyteSource(Source[CreatePipelineRequest]):
)
def fetch_pipeline_status(
self, connection: dict, pipeline_fqn: str
self, workspace: dict, connection: dict, pipeline_fqn: str
) -> OMetaPipelineStatus:
"""
Method to get task & pipeline status
"""
# Airbyte does not offer specific attempt link, just at pipeline level
log_link = (
f"{self.service_connection.hostPort}/workspaces/{workspace.get('workspaceId')}/connections/"
f"{connection.get('connectionId')}/status"
)
for job in self.client.list_jobs(connection.get("connectionId")):
if not job or not job.get("attempts"):
continue
for attempt in job["attempts"]:
task_status = [
TaskStatus(
name=str(connection.get("connectionId")),
executionStatus=STATUS_MAP.get(
attempt["status"].lower(), StatusType.Pending
).value,
startTime=attempt.get("createdAt"),
endTime=attempt.get("endedAt"),
logLink=log_link,
)
]
pipeline_status = PipelineStatus(
@ -259,7 +270,9 @@ class AirbyteSource(Source[CreatePipelineRequest]):
service_name=self.service.name.__root__,
pipeline_name=connection.get("connectionId"),
)
yield from self.fetch_pipeline_status(connection, pipeline_fqn)
yield from self.fetch_pipeline_status(
workspace, connection, pipeline_fqn
)
if self.source_config.includeLineage:
pipeline_entity: Pipeline = self.metadata.get_by_name(
entity=Pipeline,

View File

@ -60,6 +60,7 @@ from metadata.utils.connections import (
test_connection,
)
from metadata.utils.filters import filter_by_pipeline
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -165,6 +166,9 @@ class AirflowSource(Source[CreatePipelineRequest]):
executionStatus=STATUS_MAP.get(
task.state, StatusType.Pending.value
),
startTime=datetime_to_ts(task.start_date),
endTime=datetime_to_ts(task.end_date),
logLink=task.log_url,
)
for task in tasks
]