Fix #5702: Airbyte Pipeline Status Fix (#5726)

This commit is contained in:
Mayur Singal 2022-06-29 14:03:25 +05:30 committed by GitHub
parent 11dc2e4918
commit 81047c4d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,6 +11,7 @@
""" """
Airbyte source to extract metadata Airbyte source to extract metadata
""" """
import traceback
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Iterable, List, Optional from typing import Iterable, List, Optional
@ -122,14 +123,11 @@ class AirbyteSource(Source[CreatePipelineRequest]):
""" """
return [ return [
Task( Task(
name=job["job"]["id"], name=connection["connectionId"],
displayName=job["job"]["id"], displayName=connection["name"],
description="", description="",
taskUrl=f"{connection_url}/status", taskUrl=f"{connection_url}/status",
taskType=job["job"]["configType"],
) )
for job in self.client.list_jobs(connection.get("connectionId"))
if job
] ]
def fetch_pipeline( def fetch_pipeline(
@ -158,21 +156,23 @@ class AirbyteSource(Source[CreatePipelineRequest]):
Method to get task & pipeline status Method to get task & pipeline status
""" """
for job in self.client.list_jobs(connection.get("connectionId")): for job in self.client.list_jobs(connection.get("connectionId")):
if not job or job.get("attempts"):
continue
for attempt in job["attempts"]:
task_status = [ task_status = [
TaskStatus( TaskStatus(
name=str(job["job"]["id"]), name=str(connection.get("connectionId")),
executionStatus=STATUS_MAP.get( executionStatus=STATUS_MAP.get(
attempt["status"].lower(), StatusType.Pending attempt["status"].lower(), StatusType.Pending
).value, ).value,
) )
for attempt in job["attempts"]
] ]
pipeline_status = PipelineStatus( pipeline_status = PipelineStatus(
executionStatus=STATUS_MAP.get( executionStatus=STATUS_MAP.get(
job["job"]["status"].lower(), StatusType.Pending attempt["status"].lower(), StatusType.Pending
).value, ).value,
taskStatus=task_status, taskStatus=task_status,
executionDate=job["job"]["createdAt"], executionDate=attempt["createdAt"],
) )
yield OMetaPipelineStatus( yield OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status