diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 8b5c7b7334a..4d5b1f33b74 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -205,7 +205,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): Yields lineage if config is enabled """ if self.source_config.dbServiceName: - yield from self.yield_dashboard_lineage_details(dashboard_details) + yield from self.yield_dashboard_lineage_details(dashboard_details) or [] def yield_tag(self, *args, **kwargs) -> Optional[Iterable[OMetaTagAndCategory]]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/glue.py b/ingestion/src/metadata/ingestion/source/database/glue.py index a4922eba6b0..c00369d4224 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue.py +++ b/ingestion/src/metadata/ingestion/source/database/glue.py @@ -243,40 +243,6 @@ class GlueSource(Source[Entity]): return None - def get_downstream_tasks(self, task_unique_id, tasks): - downstream_tasks = [] - for edges in tasks["Edges"]: - if ( - edges["SourceId"] == task_unique_id - and edges["DestinationId"] in self.task_id_mapping.values() - ): - downstream_tasks.append( - list(self.task_id_mapping.keys())[ - list(self.task_id_mapping.values()).index( - edges["DestinationId"] - ) - ][:128] - ) - return downstream_tasks - - def get_tasks(self, tasks): - task_list = [] - for task in tasks["Graph"]["Nodes"]: - task_name = task["Name"][:128] - self.task_id_mapping[task_name] = task["UniqueId"] - for task in tasks["Graph"]["Nodes"]: - task_list.append( - Task( - name=task["Name"], - displayName=task["Name"], - taskType=task["Type"], - downstreamTasks=self.get_downstream_tasks( - task["UniqueId"], tasks["Graph"] - ), - ) - ) - return task_list - def close(self): pass diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py index f4c5d47aa6d..4d12391a19c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte.py @@ -11,14 +11,14 @@ """ Airbyte source to extract metadata """ -import traceback -from dataclasses import dataclass, field -from typing import Iterable, List, Optional + +from typing import Iterable, Optional + +from pydantic import BaseModel from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import ( - Pipeline, PipelineStatus, StatusType, Task, @@ -32,39 +32,21 @@ from metadata.generated.schema.entity.services.connections.pipeline.airbyteConne AirbyteConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.pipelineService import PipelineService -from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( - PipelineServiceMetadataPipeline, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.airbyte_client import AirbyteClient -from metadata.utils.filters import filter_by_pipeline from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -@dataclass -class AirbyteSourceStatus(SourceStatus): - pipelines_scanned: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) - - def pipeline_scanned(self, topic: str) -> None: - self.pipelines_scanned.append(topic) - - def dropped(self, topic: str) -> None: - self.filtered.append(topic) - - STATUS_MAP = { "cancelled": StatusType.Failed, "succeeded": StatusType.Successful, @@ -75,33 +57,27 @@ STATUS_MAP = { } -class AirbyteSource(Source[CreatePipelineRequest]): +class AirbytePipelineDetails(BaseModel): + """ + Wrapper Class to combine the workspace with connection + """ + + workspace: dict + connection: dict + + +class AirbyteSource(PipelineServiceSource): """ Implements the necessary methods ot extract Pipeline metadata from Airflow's metadata db """ - config: WorkflowSource - report: AirbyteSourceStatus - def __init__( self, config: WorkflowSource, metadata_config: OpenMetadataConnection, ): - super().__init__() - self.config = config - self.source_config: PipelineServiceMetadataPipeline = ( - self.config.sourceConfig.config - ) - self.service_connection = self.config.serviceConnection.__root__.config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(self.metadata_config) - self.status = AirbyteSourceStatus() - self.service: PipelineService = self.metadata.get_service_or_create( - entity=PipelineService, config=config - ) - + super().__init__(config, metadata_config) self.client = AirbyteClient(self.service_connection) @classmethod @@ -114,9 +90,6 @@ class AirbyteSource(Source[CreatePipelineRequest]): ) return cls(config, metadata_config) - def prepare(self): - pass - def get_connections_jobs(self, connection: dict, connection_url: str): """ Returns the list of tasks linked to connection @@ -130,46 +103,51 @@ class AirbyteSource(Source[CreatePipelineRequest]): ) ] - def fetch_pipeline( - self, connection: dict, workspace: dict + def yield_pipeline( + self, pipeline_details: AirbytePipelineDetails ) -> Iterable[CreatePipelineRequest]: """ Convert a Connection into a Pipeline Entity - :param connection: connection object from airbyte - :param connection: workspace object from airbyte + :param pipeline_details: pipeline_details object from airbyte :return: Create Pipeline request with tasks """ - connection_url = f"/workspaces/{workspace.get('workspaceId')}/connections/{connection.get('connectionId')}" + connection_url = f"/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/{pipeline_details.connection.get('connectionId')}" yield CreatePipelineRequest( - name=connection.get("connectionId"), - displayName=connection.get("name"), + name=pipeline_details.connection.get("connectionId"), + displayName=pipeline_details.connection.get("name"), description="", pipelineUrl=connection_url, - tasks=self.get_connections_jobs(connection, connection_url), - service=EntityReference(id=self.service.id, type="pipelineService"), + tasks=self.get_connections_jobs( + pipeline_details.connection, connection_url + ), + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), ) - def fetch_pipeline_status( - self, workspace: dict, connection: dict, pipeline_fqn: str - ) -> OMetaPipelineStatus: + def yield_pipeline_status( + self, pipeline_details: AirbytePipelineDetails + ) -> Optional[OMetaPipelineStatus]: """ Method to get task & pipeline status """ # Airbyte does not offer specific attempt link, just at pipeline level log_link = ( - f"{self.service_connection.hostPort}/workspaces/{workspace.get('workspaceId')}/connections/" - f"{connection.get('connectionId')}/status" + f"{self.service_connection.hostPort}/workspaces/{pipeline_details.workspace.get('workspaceId')}/connections/" + f"{pipeline_details.connection.get('connectionId')}/status" ) - for job in self.client.list_jobs(connection.get("connectionId")): + for job in self.client.list_jobs( + pipeline_details.connection.get("connectionId") + ): if not job or not job.get("attempts"): continue for attempt in job["attempts"]: task_status = [ TaskStatus( - name=str(connection.get("connectionId")), + name=str(pipeline_details.connection.get("connectionId")), executionStatus=STATUS_MAP.get( attempt["status"].lower(), StatusType.Pending ).value, @@ -186,21 +164,23 @@ class AirbyteSource(Source[CreatePipelineRequest]): executionDate=attempt["createdAt"], ) yield OMetaPipelineStatus( - pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, ) - def fetch_lineage( - self, connection: dict, pipeline_entity: Pipeline + def yield_pipeline_lineage_details( + self, pipeline_details: AirbytePipelineDetails ) -> Optional[Iterable[AddLineageRequest]]: """ Parse all the stream available in the connection and create a lineage between them - :param connection: connection object from airbyte - :param pipeline_entity: Pipeline we just ingested + :param pipeline_details: pipeline_details object from airbyte :return: Lineage from inlets and outlets """ - source_connection = self.client.get_source(connection.get("sourceId")) + source_connection = self.client.get_source( + pipeline_details.connection.get("sourceId") + ) destination_connection = self.client.get_destination( - connection.get("destinationId") + pipeline_details.connection.get("destinationId") ) source_service = self.metadata.get_by_name( entity=DatabaseService, fqn=source_connection.get("name") @@ -211,7 +191,9 @@ class AirbyteSource(Source[CreatePipelineRequest]): if not source_service or not destination_service: return - for task in connection.get("syncCatalog", {}).get("streams") or []: + for task in ( + pipeline_details.connection.get("syncCatalog", {}).get("streams") or [] + ): stream = task.get("stream") from_fqn = fqn.build( self.metadata, @@ -239,57 +221,32 @@ class AirbyteSource(Source[CreatePipelineRequest]): yield AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference(id=from_entity.id, type="table"), - toEntity=EntityReference(id=pipeline_entity.id, type="pipeline"), + toEntity=EntityReference( + id=self.context.pipeline.id.__root__, type="pipeline" + ), ) ) yield AddLineageRequest( edge=EntitiesEdge( toEntity=EntityReference(id=to_entity.id, type="table"), - fromEntity=EntityReference(id=pipeline_entity.id, type="pipeline"), + fromEntity=EntityReference( + id=self.context.pipeline.id.__root__, type="pipeline" + ), ) ) - def next_record(self) -> Iterable[Entity]: + def get_pipelines_list(self) -> Iterable[AirbytePipelineDetails]: """ - Extract metadata information to create Pipelines with Tasks + Get List of all pipelines """ for workspace in self.client.list_workspaces(): for connection in self.client.list_connections( workflow_id=workspace.get("workspaceId") ): - try: - if filter_by_pipeline( - self.source_config.pipelineFilterPattern, - connection.get("connectionId"), - ): - continue - yield from self.fetch_pipeline(connection, workspace) - pipeline_fqn = fqn.build( - self.metadata, - entity_type=Pipeline, - service_name=self.service.name.__root__, - pipeline_name=connection.get("connectionId"), - ) - yield from self.fetch_pipeline_status( - workspace, connection, pipeline_fqn - ) - if self.source_config.includeLineage: - pipeline_entity: Pipeline = self.metadata.get_by_name( - entity=Pipeline, - fqn=pipeline_fqn, - ) - yield from self.fetch_lineage(connection, pipeline_entity) or [] + yield AirbytePipelineDetails(workspace=workspace, connection=connection) - except Exception as err: - logger.error(repr(err)) - logger.debug(traceback.format_exc()) - self.status.failure(connection.get("connectionId"), repr(err)) - - def get_status(self): - return self.status - - def close(self): - pass - - def test_connection(self) -> None: - pass + def get_pipeline_name(self, pipeline_details: AirbytePipelineDetails) -> str: + """ + Get Pipeline Name + """ + return pipeline_details.connection.get("connectionId") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py index 764fa01abc9..03e2988817a 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow.py @@ -11,15 +11,12 @@ """ Airflow source to extract metadata from OM UI """ -import sys import traceback from collections.abc import Iterable -from dataclasses import dataclass, field from typing import Any, Iterable, List, Optional, cast from airflow.models import BaseOperator, DagRun from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstance from airflow.serialization.serialized_objects import SerializedDAG from sqlalchemy.engine import Engine from sqlalchemy.orm import Session @@ -40,44 +37,24 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( AirflowConnection, ) -from metadata.generated.schema.entity.services.pipelineService import PipelineService -from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( - PipelineServiceMetadataPipeline, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils import fqn +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.connections import ( create_and_bind_session, get_connection, test_connection, ) -from metadata.utils.filters import filter_by_pipeline from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger logger = ingestion_logger() - -@dataclass -class AirflowSourceStatus(SourceStatus): - pipelines_scanned: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) - - def pipeline_scanned(self, topic: str) -> None: - self.pipelines_scanned.append(topic) - - def dropped(self, topic: str) -> None: - self.filtered.append(topic) - - STATUS_MAP = { "success": StatusType.Successful.value, "failed": StatusType.Failed.value, @@ -85,38 +62,24 @@ STATUS_MAP = { } -class AirflowSource(Source[CreatePipelineRequest]): +class AirflowSource(PipelineServiceSource): """ Implements the necessary methods ot extract Pipeline metadata from Airflow's metadata db """ config: WorkflowSource - report: AirflowSourceStatus def __init__( self, config: WorkflowSource, metadata_config: OpenMetadataConnection, ): - super().__init__() - self.config = config - self.source_config: PipelineServiceMetadataPipeline = ( - self.config.sourceConfig.config - ) - self.service_connection = self.config.serviceConnection.__root__.config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(self.metadata_config) - self.status = AirflowSourceStatus() - self.service: PipelineService = self.metadata.get_service_or_create( - entity=PipelineService, config=config - ) - self.numberOfStatus = ( - self.config.serviceConnection.__root__.config.numberOfStatus - ) - # Create the connection to the database self._session = None + self.service_connection = config.serviceConnection.__root__.config self.engine: Engine = get_connection(self.service_connection.connection) + super().__init__(config, metadata_config) + # Create the connection to the database @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -138,22 +101,20 @@ class AirflowSource(Source[CreatePipelineRequest]): return self._session - def prepare(self): - pass - def get_pipeline_status(self, dag_id: str) -> DagRun: dag_run_list: DagRun = ( self.session.query(DagRun) .filter(DagRun.dag_id == dag_id) .order_by(DagRun.execution_date.desc()) - .limit(self.numberOfStatus) + .limit(self.config.serviceConnection.__root__.config.numberOfStatus) ) return dag_run_list - def fetch_pipeline_status( - self, serialized_dag: SerializedDAG, pipeline_fqn: str + def yield_pipeline_status( + self, pipeline_details: SerializedDAG ) -> OMetaPipelineStatus: - dag_run_list = self.get_pipeline_status(serialized_dag.dag_id) + dag_run_list = self.get_pipeline_status(pipeline_details.dag_id) + for dag in dag_run_list: if isinstance(dag.task_instances, Iterable): tasks = dag.task_instances @@ -179,10 +140,11 @@ class AirflowSource(Source[CreatePipelineRequest]): executionDate=dag.execution_date.timestamp(), ) yield OMetaPipelineStatus( - pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, ) - def list_dags(self) -> Iterable[SerializedDagModel]: + def get_pipelines_list(self) -> Iterable[SerializedDagModel]: """ List all DAGs from the metadata db. @@ -192,6 +154,12 @@ class AirflowSource(Source[CreatePipelineRequest]): for serialized_dag in self.session.query(SerializedDagModel).all(): yield serialized_dag + def get_pipeline_name(self, pipeline_details: SerializedDAG) -> str: + """ + Get Pipeline Name + """ + return pipeline_details.dag_id + @staticmethod def get_tasks_from_dag(dag: SerializedDAG) -> List[Task]: """ @@ -213,25 +181,26 @@ class AirflowSource(Source[CreatePipelineRequest]): for task in cast(Iterable[BaseOperator], dag.tasks) ] - def fetch_pipeline( - self, serialized_dag: SerializedDagModel + def yield_pipeline( + self, pipeline_details: SerializedDagModel ) -> Iterable[CreatePipelineRequest]: """ Convert a DAG into a Pipeline Entity :param serialized_dag: SerializedDAG from airflow metadata DB :return: Create Pipeline request with tasks """ - dag: SerializedDAG = serialized_dag.dag - + dag: SerializedDAG = pipeline_details.dag yield CreatePipelineRequest( - name=serialized_dag.dag_id, + name=pipeline_details.dag_id, description=dag.description, pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix concurrency=dag.concurrency, - pipelineLocation=serialized_dag.fileloc, + pipelineLocation=pipeline_details.fileloc, startDate=dag.start_date.isoformat() if dag.start_date else None, tasks=self.get_tasks_from_dag(dag), - service=EntityReference(id=self.service.id, type="pipelineService"), + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), ) @staticmethod @@ -280,16 +249,15 @@ class AirflowSource(Source[CreatePipelineRequest]): logger.warn(f"Error trying to parse outlets - {err}") return None - def fetch_lineage( - self, serialized_dag: SerializedDagModel, pipeline_entity: Pipeline - ) -> Iterable[AddLineageRequest]: + def yield_pipeline_lineage_details( + self, pipeline_details: SerializedDagModel + ) -> Optional[Iterable[AddLineageRequest]]: """ Parse xlets and add lineage between Pipelines and Tables - :param serialized_dag: SerializedDAG from airflow metadata DB - :param pipeline_entity: Pipeline we just ingested + :param pipeline_details: SerializedDAG from airflow metadata DB :return: Lineage from inlets and outlets """ - dag: SerializedDAG = serialized_dag.dag + dag: SerializedDAG = pipeline_details.dag for task in dag.tasks: for table_fqn in self.get_inlets(task) or []: @@ -303,14 +271,14 @@ class AirflowSource(Source[CreatePipelineRequest]): id=table_entity.id, type="table" ), toEntity=EntityReference( - id=pipeline_entity.id, type="pipeline" + id=self.context.pipeline.id.__root__, type="pipeline" ), ) ) else: logger.warn( f"Could not find Table [{table_fqn}] from " - f"[{pipeline_entity.fullyQualifiedName.__root__}] inlets" + f"[{self.context.pipeline.fullyQualifiedName.__root__}] inlets" ) for table_fqn in self.get_outlets(task) or []: @@ -321,7 +289,7 @@ class AirflowSource(Source[CreatePipelineRequest]): yield AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference( - id=pipeline_entity.id, type="pipeline" + id=self.context.pipeline.id.__root__, type="pipeline" ), toEntity=EntityReference(id=table_entity.id, type="table"), ) @@ -329,45 +297,9 @@ class AirflowSource(Source[CreatePipelineRequest]): else: logger.warn( f"Could not find Table [{table_fqn}] from " - f"[{pipeline_entity.fullyQualifiedName.__root__}] outlets" + f"[{self.context.pipeline.fullyQualifiedName.__root__}] outlets" ) - def next_record(self) -> Iterable[Entity]: - """ - Extract metadata information to create Pipelines with Tasks - and lineage - """ - for serialized_dag in self.list_dags(): - try: - if not filter_by_pipeline( - self.source_config.pipelineFilterPattern, serialized_dag.dag_id - ): - yield from self.fetch_pipeline(serialized_dag) - pipeline_fqn = fqn.build( - self.metadata, - entity_type=Pipeline, - service_name=self.service.name.__root__, - pipeline_name=serialized_dag.dag_id, - ) - yield from self.fetch_pipeline_status(serialized_dag, pipeline_fqn) - if self.source_config.includeLineage: - pipeline_entity: Pipeline = self.metadata.get_by_name( - entity=Pipeline, - fqn=pipeline_fqn, - ) - yield from self.fetch_lineage(serialized_dag, pipeline_entity) - else: - self.status.dropped(serialized_dag.dag_id) - - except Exception as err: - logger.error(repr(err)) - logger.debug(traceback.format_exc()) - logger.debug(sys.exc_info()[2]) - self.status.failure(serialized_dag.dag_id, repr(err)) - - def get_status(self): - return self.status - def close(self): self.session.close() diff --git a/ingestion/src/metadata/ingestion/source/pipeline/glue.py b/ingestion/src/metadata/ingestion/source/pipeline/glue.py index db258609680..ece567e7aa8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/glue.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/glue.py @@ -8,35 +8,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import traceback -import uuid -from typing import Iterable +from typing import Any, Iterable, Optional -from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + PipelineStatus, + StatusType, + Task, + TaskStatus, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( GlueConnection, ) -from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( - GlueConnection as GluePipelineConnection, -) -from metadata.generated.schema.entity.services.pipelineService import ( - PipelineConnection, - PipelineService, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus -from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.database.common_db_source import SQLSourceStatus -from metadata.utils.connections import get_connection, test_connection +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -44,31 +39,22 @@ logger = ingestion_logger() GRAPH = "Graph" NODES = "Nodes" NAME = "Name" +JOB_TYPE = "JOB" +STATUS_MAP = { + "cancelled": StatusType.Failed, + "succeeded": StatusType.Successful, + "failed": StatusType.Failed, + "running": StatusType.Pending, + "incomplete": StatusType.Failed, + "pending": StatusType.Pending, +} -class GlueSource(Source[Entity]): +class GlueSource(PipelineServiceSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): - super().__init__() - self.status = SQLSourceStatus() - self.config = config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(metadata_config) - - self.service_connection = self.config.serviceConnection.__root__.config - self.pipeline_service = self.metadata.get_service_or_create( - entity=PipelineService, - config=WorkflowSource( - type="glue", - serviceName=self.config.serviceName, - serviceConnection=PipelineConnection( - config=GluePipelineConnection( - awsConfig=self.service_connection.awsConfig - ), - ), - sourceConfig={}, - ), - ) - self.connection = get_connection(self.service_connection) + super().__init__(config, metadata_config) + self.task_id_mapping = {} + self.job_name_list = set() self.glue = self.connection.client @classmethod @@ -81,55 +67,101 @@ class GlueSource(Source[Entity]): ) return cls(config, metadata_config) - def prepare(self): - pass + def get_pipelines_list(self) -> Iterable[dict]: + """ + Get List of all pipelines + """ + for workflow in self.glue.list_workflows()["Workflows"]: + jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)["Workflow"] + yield jobs - def next_record(self) -> Iterable[Entity]: + def get_pipeline_name(self, pipeline_details: dict) -> str: + """ + Get Pipeline Name + """ + return pipeline_details[NAME] - yield from self.ingest_pipelines() + def yield_pipeline(self, pipeline_details: Any) -> Iterable[CreatePipelineRequest]: + """ + Method to Get Pipeline Entity + """ + self.job_name_list = set() + pipeline_ev = CreatePipelineRequest( + name=pipeline_details[NAME], + displayName=pipeline_details[NAME], + description="", + tasks=self.get_tasks(pipeline_details), + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), + ) + yield pipeline_ev - def get_tasks(self, tasks): + def get_tasks(self, pipeline_details: Any) -> Task: task_list = [] - for task in tasks[GRAPH][NODES]: + for task in pipeline_details["Graph"]["Nodes"]: + self.task_id_mapping[task["UniqueId"]] = task["Name"][:128] + if task["Type"] == JOB_TYPE: + self.job_name_list.add(task[NAME]) + for task in pipeline_details[GRAPH][NODES]: task_list.append( Task( name=task[NAME], displayName=task[NAME], taskType=task["Type"], downstreamTasks=self.get_downstream_tasks( - task["UniqueId"], tasks[GRAPH] + task["UniqueId"], pipeline_details[GRAPH] ), ) ) return task_list - def ingest_pipelines(self) -> Iterable[OMetaDatabaseAndTable]: - try: - for workflow in self.glue.list_workflows()["Workflows"]: - jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)[ - "Workflow" - ] - tasks = self.get_tasks(jobs) - pipeline_ev = Pipeline( - id=uuid.uuid4(), - name=jobs[NAME], - displayName=jobs[NAME], - description="", - tasks=tasks, - service=EntityReference( - id=self.pipeline_service.id, type="pipelineService" - ), - ) - yield pipeline_ev - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(err) + def get_downstream_tasks(self, task_unique_id, tasks): + downstream_tasks = [] + for edges in tasks["Edges"]: + if edges["SourceId"] == task_unique_id and self.task_id_mapping.get( + edges["DestinationId"] + ): + downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]]) + return downstream_tasks - def close(self): - pass + def yield_pipeline_status( + self, pipeline_details: Any + ) -> Iterable[OMetaPipelineStatus]: + for job in self.job_name_list: + try: + runs = self.glue.get_job_runs(JobName=job) + runs = runs.get("JobRuns", []) + for attempt in runs: + task_status = [] + task_status.append( + TaskStatus( + name=attempt["JobName"], + executionStatus=STATUS_MAP.get( + attempt["JobRunState"].lower(), StatusType.Pending + ).value, + startTime=attempt["StartedOn"].timestamp(), + endTime=attempt["CompletedOn"].timestamp(), + ) + ) + pipeline_status = PipelineStatus( + taskStatus=task_status, + executionDate=attempt["StartedOn"].timestamp(), + executionStatus=STATUS_MAP.get( + attempt["JobRunState"].lower(), StatusType.Pending + ).value, + ) + yield OMetaPipelineStatus( + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) - def get_status(self) -> SourceStatus: - return self.status - - def test_connection(self) -> None: - test_connection(self.connection) + def yield_pipeline_lineage_details( + self, pipeline_details: Any + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Get lineage between pipeline and data sources + """ diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py new file mode 100644 index 00000000000..ab46754b014 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -0,0 +1,227 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base class for ingesting database services +""" +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Iterable, List, Optional + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, +) +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + PipelineServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.models.topology import ( + NodeStage, + ServiceTopology, + TopologyNode, + create_source_context, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.connections import get_connection, test_connection +from metadata.utils.filters import filter_by_dashboard +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class PipelineServiceTopology(ServiceTopology): + """ + Defines the hierarchy in Pipeline Services. + + We could have a topology validator. We can only consume + data that has been produced by any parent node. + """ + + root = TopologyNode( + producer="get_services", + stages=[ + NodeStage( + type_=PipelineService, + context="pipeline_service", + processor="yield_pipeline_service", + ), + ], + children=["pipeline"], + ) + pipeline = TopologyNode( + producer="get_pipeline", + stages=[ + NodeStage( + type_=Pipeline, + context="pipeline", + processor="yield_pipeline", + consumer=["pipeline_service"], + ), + NodeStage( + type_=OMetaPipelineStatus, + context="pipeline_status", + processor="yield_pipeline_status", + consumer=["pipeline_service"], + nullable=True, + ack_sink=False, + ), + NodeStage( + type_=AddLineageRequest, + context="lineage", + processor="yield_pipeline_lineage", + consumer=["pipeline_service"], + ack_sink=False, + nullable=True, + ), + ], + ) + + +@dataclass +class PipelineSourceStatus(SourceStatus): + """ + Reports the source status after ingestion + """ + + pipelines_scanned: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) + + def pipeline_scanned(self, topic: str) -> None: + self.pipelines_scanned.append(topic) + + def dropped(self, topic: str) -> None: + self.filtered.append(topic) + + +class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): + """ + Base class for Pipeline Services. + It implements the topology and context. + """ + + @abstractmethod + def yield_pipeline(self, pipeline_details: Any) -> Iterable[CreatePipelineRequest]: + """ + Method to Get Pipeline Entity + """ + + @abstractmethod + def yield_pipeline_lineage_details( + self, pipeline_details: Any + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Get lineage between pipeline and data sources + """ + + @abstractmethod + def get_pipelines_list(self) -> Optional[List[Any]]: + """ + Get List of all pipelines + """ + + @abstractmethod + def get_pipeline_name(self, pipeline_details: Any) -> str: + """ + Get Pipeline Name + """ + + @abstractmethod + def yield_pipeline_status( + self, pipeline_details: Any + ) -> Optional[OMetaPipelineStatus]: + """ + Get Pipeline Status + """ + + def yield_pipeline_lineage( + self, pipeline_details: Any + ) -> Iterable[AddLineageRequest]: + """ + Yields lineage if config is enabled + """ + if self.source_config.includeLineage: + yield from self.yield_pipeline_lineage_details(pipeline_details) or [] + + status: PipelineSourceStatus + source_config: PipelineServiceMetadataPipeline + config: WorkflowSource + metadata: OpenMetadata + # Big union of types we want to fetch dynamically + service_connection: PipelineConnection.__fields__["config"].type_ + + topology = PipelineServiceTopology() + context = create_source_context(topology) + + @abstractmethod + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config + self.source_config: PipelineServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.connection = get_connection(self.service_connection) + self.test_connection() + self.status = PipelineSourceStatus() + + def get_status(self) -> SourceStatus: + return self.status + + def close(self): + """ + Method to implement any required logic after the ingesion process is completed + """ + + def get_services(self) -> Iterable[WorkflowSource]: + yield self.config + + def yield_pipeline_service(self, config: WorkflowSource): + yield self.metadata.get_create_service_from_source( + entity=PipelineService, config=config + ) + + def get_pipeline(self) -> Any: + for pipeline_detail in self.get_pipelines_list(): + if filter_by_dashboard( + self.source_config.pipelineFilterPattern, + self.get_pipeline_name(pipeline_detail), + ): + self.status.filter( + self.get_pipeline_name(pipeline_detail), + "Pipeline Pattern not Allowed", + ) + continue + yield pipeline_detail + + def test_connection(self) -> None: + test_connection(self.connection) + + def prepare(self): + """ + Method to implement any required logic before starting the ingesion process + """ diff --git a/ingestion/tests/unit/resources/datasets/airbyte_dataset.json b/ingestion/tests/unit/resources/datasets/airbyte_dataset.json new file mode 100644 index 00000000000..1401d0707ac --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/airbyte_dataset.json @@ -0,0 +1,218 @@ +{ + "workspace": [ + { + "workspaceId": "af5680ec-2687-4fe0-bd55-5ad5f020a603", + "customerId": "de633783-f0f2-4248-9ea2-092c4b5e1165", + "email": "mayur@getcollate.io", + "name": "af5680ec-2687-4fe0-bd55-5ad5f020a603", + "slug": "af5680ec-2687-4fe0-bd55-5ad5f020a603", + "initialSetupComplete": true, + "displaySetupWizard": false, + "anonymousDataCollection": false, + "news": false, + "securityUpdates": false, + "notifications": [] + } + ], + "connection": [ + { + "connectionId": "a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + "name": "MSSQL <> Postgres", + "namespaceDefinition": "source", + "namespaceFormat": "${SOURCE_NAMESPACE}", + "prefix": "", + "sourceId": "ed3783a9-8479-4be0-80aa-2075ccc6e310", + "destinationId": "4181764f-3ed8-44cb-9b8e-fefff7fb5ea9", + "operationIds": ["942463dd-f37c-4794-8931-88da6c166a04"], + "syncCatalog": { + "streams": [ + { + "stream": { + "name": "brands", + "jsonSchema": { + "type": "object", + "properties": { + "brand_id": {"type": "number"}, + "brand_name": {"type": "string"} + } + }, + "supportedSyncModes": ["full_refresh", "incremental"], + "defaultCursorField": [], + "sourceDefinedPrimaryKey": [["brand_id"]], + "namespace": "production" + }, + "config": { + "syncMode": "full_refresh", + "cursorField": [], + "destinationSyncMode": "overwrite", + "primaryKey": [["brand_id"]], + "aliasName": "brands", + "selected": true + } + }, + { + "stream": { + "name": "categories", + "jsonSchema": { + "type": "object", + "properties": { + "category_id": {"type": "number"}, + "category_name": {"type": "string"} + } + }, + "supportedSyncModes": ["full_refresh", "incremental"], + "defaultCursorField": [], + "sourceDefinedPrimaryKey": [["category_id"]], + "namespace": "production" + }, + "config": { + "syncMode": "full_refresh", + "cursorField": [], + "destinationSyncMode": "overwrite", + "primaryKey": [["category_id"]], + "aliasName": "categories", + "selected": true + } + }, + { + "stream": { + "name": "products", + "jsonSchema": { + "type": "object", + "properties": { + "brand_id": {"type": "number"}, + "list_price": {"type": "number"}, + "model_year": {"type": "number"}, + "product_id": {"type": "number"}, + "category_id": {"type": "number"}, + "product_name": {"type": "string"} + } + }, + "supportedSyncModes": ["full_refresh", "incremental"], + "defaultCursorField": [], + "sourceDefinedPrimaryKey": [["product_id"]], + "namespace": "production" + }, + "config": { + "syncMode": "full_refresh", + "cursorField": [], + "destinationSyncMode": "overwrite", + "primaryKey": [["product_id"]], + "aliasName": "products", + "selected": true + } + }, + { + "stream": { + "name": "stocks", + "jsonSchema": { + "type": "object", + "properties": { + "quantity": {"type": "number"}, + "store_id": {"type": "number"}, + "product_id": {"type": "number"} + } + }, + "supportedSyncModes": ["full_refresh", "incremental"], + "defaultCursorField": [], + "sourceDefinedPrimaryKey": [["store_id"], ["product_id"]], + "namespace": "production" + }, + "config": { + "syncMode": "full_refresh", + "cursorField": [], + "destinationSyncMode": "overwrite", + "primaryKey": [["store_id"], ["product_id"]], + "aliasName": "stocks", + "selected": true + } + } + ] + }, + "schedule": {"units": 24, "timeUnit": "hours"}, + "status": "inactive" + } + ], + "jobs": [ + { + "job": { + "id": 14, + "configType": "sync", + "configId": "a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + "createdAt": 1655482894, + "updatedAt": 1655482894, + "status": "running" + }, + "attempts": [ + { + "id": 0, + "status": "running", + "createdAt": 1655482894, + "updatedAt": 1655482894, + "streamStats": [] + } + ] + }, + { + "job": { + "id": 13, + "configType": "sync", + "configId": "a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + "createdAt": 1655393914, + "updatedAt": 1655394054, + "status": "succeeded" + }, + "attempts": [ + { + "id": 0, + "status": "succeeded", + "createdAt": 1655393914, + "updatedAt": 1655394054, + "endedAt": 1655394054, + "bytesSynced": 85607, + "recordsSynced": 1276, + "totalStats": { + "recordsEmitted": 1276, + "bytesEmitted": 85607, + "stateMessagesEmitted": 0, + "recordsCommitted": 1276 + }, + "streamStats": [ + { + "streamName": "brands", + "stats": { + "recordsEmitted": 9, + "bytesEmitted": 333, + "recordsCommitted": 9 + } + }, + { + "streamName": "categories", + "stats": { + "recordsEmitted": 7, + "bytesEmitted": 359, + "recordsCommitted": 7 + } + }, + { + "streamName": "stocks", + "stats": { + "recordsEmitted": 939, + "bytesEmitted": 41608, + "recordsCommitted": 939 + } + }, + { + "streamName": "products", + "stats": { + "recordsEmitted": 321, + "bytesEmitted": 43307, + "recordsCommitted": 321 + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/ingestion/tests/unit/topology/pipeline/test_airbyte.py b/ingestion/tests/unit/topology/pipeline/test_airbyte.py new file mode 100644 index 00000000000..fb57c6ca460 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_airbyte.py @@ -0,0 +1,177 @@ +import json +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.airbyte import ( + AirbytePipelineDetails, + AirbyteSource, +) + +mock_data_file = open("ingestion/tests/unit/resources/datasets/airbyte_dataset.json") +mock_data: dict = json.load(mock_data_file) + +mock_airbyte_config = { + "source": { + "type": "airbyte", + "serviceName": "airbyte_source", + "serviceConnection": { + "config": {"type": "Airbyte", "hostPort": "http://localhost:1234"} + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth", + } + }, +} + + +EXPECTED_ARIBYTE_DETAILS = AirbytePipelineDetails( + workspace=mock_data["workspace"][0], connection=mock_data["connection"][0] +) + +MOCK_CONNECTION_URI_PATH = "/workspaces/af5680ec-2687-4fe0-bd55-5ad5f020a603/connections/a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f" +MOCK_LOG_URL = f"http://localhost:1234{MOCK_CONNECTION_URI_PATH}" + + +EXPECTED_PIPELINE_STATUS = [ + OMetaPipelineStatus( + pipeline_fqn="airbyte_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + pipeline_status=PipelineStatus( + executionStatus=StatusType.Pending.value, + taskStatus=[ + TaskStatus( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + executionStatus=StatusType.Pending.value, + startTime=1655482894, + endTime=None, + logLink=f"{MOCK_LOG_URL}/status", + ) + ], + executionDate=1655482894, + ), + ), + OMetaPipelineStatus( + pipeline_fqn="airbyte_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + pipeline_status=PipelineStatus( + executionStatus=StatusType.Successful.value, + taskStatus=[ + TaskStatus( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + executionStatus=StatusType.Successful.value, + startTime=1655393914, + endTime=1655394054, + logLink=f"{MOCK_LOG_URL}/status", + ) + ], + executionDate=1655393914, + ), + ), +] + + +EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + displayName="MSSQL <> Postgres", + description="", + pipelineUrl=MOCK_CONNECTION_URI_PATH, + tasks=[ + Task( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + displayName="MSSQL <> Postgres", + description="", + taskUrl=f"{MOCK_CONNECTION_URI_PATH}/status", + ) + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +MOCK_PIPELINE_SERVICE = PipelineService( + id="85811038-099a-11ed-861d-0242ac120002", + name="airbyte_source", + connection=PipelineConnection(), + serviceType=PipelineServiceType.Airbyte, +) + +MOCK_PIPELINE = Pipeline( + id="2aaa012e-099a-11ed-861d-0242ac120002", + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + fullyQualifiedName="airbyte_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + displayName="MSSQL <> Postgres", + description="", + pipelineUrl=MOCK_CONNECTION_URI_PATH, + tasks=[ + Task( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + displayName="MSSQL <> Postgres", + description="", + taskUrl=f"{MOCK_CONNECTION_URI_PATH}/status", + ) + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + + +class AirbyteUnitTest(TestCase): + def __init__(self, methodName: str = ...) -> None: + super().__init__(methodName) + self.init_workflow() + + @patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection") + @patch("metadata.ingestion.source.pipeline.airbyte.AirbyteClient") + def init_workflow(self, airbyte_client, test_connection): + test_connection.return_value = False + config = OpenMetadataWorkflowConfig.parse_obj(mock_airbyte_config) + self.aribyte = AirbyteSource.create( + mock_airbyte_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.aribyte.context.__dict__["pipeline"] = MOCK_PIPELINE + self.aribyte.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE + self.client = airbyte_client.return_value + self.client.list_jobs.return_value = mock_data.get("jobs") + self.client.list_workspaces.return_value = mock_data.get("workspace") + self.client.list_connections.return_value = mock_data.get("connection") + + def test_pipeline_list(self): + assert list(self.aribyte.get_pipelines_list())[0] == EXPECTED_ARIBYTE_DETAILS + + def test_pipeline_name(self): + assert self.aribyte.get_pipeline_name( + EXPECTED_ARIBYTE_DETAILS + ) == mock_data.get("connection")[0].get("connectionId") + + def test_pipelines(self): + pipline = list(self.aribyte.yield_pipeline(EXPECTED_ARIBYTE_DETAILS))[0] + assert pipline == EXPECTED_CREATED_PIPELINES + + def test_pipeline_status(self): + assert ( + list(self.aribyte.yield_pipeline_status(EXPECTED_ARIBYTE_DETAILS)) + == EXPECTED_PIPELINE_STATUS + )