diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py index 58493f135ec..1f72c0fccb9 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -11,6 +11,7 @@ """ Airbyte source to extract metadata """ +import traceback from dataclasses import dataclass, field from typing import Iterable, List, Optional @@ -122,14 +123,11 @@ class AirbyteSource(Source[CreatePipelineRequest]): """ return [ Task( - name=job["job"]["id"], - displayName=job["job"]["id"], + name=connection["connectionId"], + displayName=connection["name"], description="", taskUrl=f"{connection_url}/status", - taskType=job["job"]["configType"], ) - for job in self.client.list_jobs(connection.get("connectionId")) - if job ] def fetch_pipeline( @@ -158,25 +156,27 @@ class AirbyteSource(Source[CreatePipelineRequest]): Method to get task & pipeline status """ for job in self.client.list_jobs(connection.get("connectionId")): - task_status = [ - TaskStatus( - name=str(job["job"]["id"]), + if not job or job.get("attempts"): + continue + for attempt in job["attempts"]: + task_status = [ + TaskStatus( + name=str(connection.get("connectionId")), + executionStatus=STATUS_MAP.get( + attempt["status"].lower(), StatusType.Pending + ).value, + ) + ] + pipeline_status = PipelineStatus( executionStatus=STATUS_MAP.get( attempt["status"].lower(), StatusType.Pending ).value, + taskStatus=task_status, + executionDate=attempt["createdAt"], + ) + yield OMetaPipelineStatus( + pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status ) - for attempt in job["attempts"] - ] - pipeline_status = PipelineStatus( - executionStatus=STATUS_MAP.get( - job["job"]["status"].lower(), StatusType.Pending - ).value, - taskStatus=task_status, - executionDate=job["job"]["createdAt"], - ) - yield OMetaPipelineStatus( - pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status - ) def fetch_lineage( self, connection: dict, pipeline_entity: Pipeline