From 34c572334e595e07c7f05bf05bd27df6d480e5ac Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Tue, 30 Jul 2024 10:16:47 +0530 Subject: [PATCH] Fixes #14945: FiveTran Improvements (#17169) --- .../source/pipeline/fivetran/client.py | 23 ++- .../source/pipeline/fivetran/metadata.py | 157 +++++++++++++----- .../source/pipeline/fivetran/models.py | 20 +++ .../source/pipeline/openlineage/metadata.py | 24 --- .../source/pipeline/pipeline_service.py | 27 +++ .../unit/topology/pipeline/test_fivetran.py | 4 + 6 files changed, 182 insertions(+), 73 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py index 4ebca0708e4..5aff2130f27 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py @@ -12,7 +12,9 @@ Client to interact with fivetran apis """ import base64 -from typing import List +from typing import List, Optional + +from requests import Response from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import ( FivetranConnection, @@ -44,8 +46,8 @@ class FivetranClient: def run_paginator(self, path: str) -> List[dict]: response = self.client.get(f"{path}?limit={self.config.limit}") - data = response["data"] - result = data["items"] + data = response.get("data") + result = data.get("items") while data.get("next_cursor"): response = self.client.get( f"{path}?limit={self.config.limit}&cursor={data['next_cursor']}" @@ -78,11 +80,22 @@ class FivetranClient: Method returns destination details """ response = self.client.get(f"/destinations/{destination_id}") - return response["data"] + return response.get("data") def get_connector_schema_details(self, connector_id: str) -> dict: """ Method returns destination details """ response = self.client.get(f"/connectors/{connector_id}/schemas") - return response["data"]["schemas"] + return response.get("data", {}).get("schemas", []) + + def get_connector_column_lineage( + self, connector_id: str, schema_name: str, table_name: str + ) -> dict: + """ + Method returns destination details + """ + response: Optional[Response] = self.client.get( + f"/connectors/{connector_id}/schemas/{schema_name}/tables/{table_name}/columns" + ) + return response.get("data", {}).get("columns", []) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py index f0d42c28def..e2aa7b1b635 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py @@ -13,9 +13,7 @@ Airbyte source to extract metadata """ import traceback -from typing import Iterable, Optional - -from pydantic import BaseModel +from typing import Iterable, List, Optional, Union, cast from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -33,13 +31,22 @@ from metadata.generated.schema.type.basic import ( FullyQualifiedEntityName, SourceUrl, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.fivetran.client import FivetranClient +from metadata.ingestion.source.pipeline.fivetran.models import FivetranPipelineDetails +from metadata.ingestion.source.pipeline.openlineage.models import TableDetails +from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.logger import ingestion_logger @@ -47,24 +54,6 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -class FivetranPipelineDetails(BaseModel): - """ - Wrapper Class to combine source & destination - """ - - source: dict - destination: dict - group: dict - - @property - def pipeline_name(self): - return f'{self.group.get("id")}_{self.source.get("id")}' - - @property - def pipeline_display_name(self): - return f'{self.group.get("name")} <> {self.source.get("schema")}' - - class FivetranSource(PipelineServiceSource): """ Implements the necessary methods ot extract @@ -83,13 +72,18 @@ class FivetranSource(PipelineServiceSource): ) return cls(config, metadata) - def get_connections_jobs(self, pipeline_details: FivetranPipelineDetails): + def get_connections_jobs( + self, + pipeline_details: FivetranPipelineDetails, + source_url: Optional[SourceUrl] = None, + ) -> List[Task]: """Returns the list of tasks linked to connection""" return [ Task( name=pipeline_details.pipeline_name, displayName=pipeline_details.pipeline_display_name, - ) + sourceUrl=source_url, + ) # type: ignore ] def yield_pipeline( @@ -100,25 +94,73 @@ class FivetranSource(PipelineServiceSource): :param pipeline_details: pipeline_details object from fivetran :return: Create Pipeline request with tasks """ + source_url = self.get_source_url( + connector_id=pipeline_details.source.get("id"), + group_id=pipeline_details.group.get("id"), + source_name=pipeline_details.source.get("service"), + ) pipeline_request = CreatePipelineRequest( name=EntityName(pipeline_details.pipeline_name), displayName=pipeline_details.pipeline_display_name, - tasks=self.get_connections_jobs(pipeline_details), - service=FullyQualifiedEntityName(self.context.get().pipeline_service), - sourceUrl=self.get_source_url( - connector_id=pipeline_details.source.get("id"), - group_id=pipeline_details.group.get("id"), - source_name=pipeline_details.source.get("service"), + tasks=self.get_connections_jobs( + pipeline_details=pipeline_details, source_url=source_url ), - ) - yield Either(right=pipeline_request) + service=FullyQualifiedEntityName(self.context.get().pipeline_service), + sourceUrl=source_url, + ) # type: ignore + yield Either(left=None, right=pipeline_request) self.register_record(pipeline_request=pipeline_request) def yield_pipeline_status( self, pipeline_details: FivetranPipelineDetails - ) -> Iterable[Either[OMetaPipelineStatus]]: + ) -> Optional[Iterable[Either[OMetaPipelineStatus]]]: """Method to get task & pipeline status""" + def fetch_column_lineage( + self, pipeline_details: FivetranPipelineDetails, schema, schema_data, table + ) -> List[Optional[ColumnLineage]]: + col_details = self.client.get_connector_column_lineage( + pipeline_details.connector_id, schema_name=schema, table_name=table + ) + col_lineage_arr = [] + try: + from_entity_fqn: Optional[str] = self._get_table_fqn_from_om( + table_details=TableDetails(schema=schema, name=table) + ) + to_entity_fqn: Optional[str] = self._get_table_fqn_from_om( + table_details=TableDetails( + schema=schema_data.get("name_in_destination"), + name=schema_data["tables"][table]["name_in_destination"], + ) + ) + except FQNNotFoundException: + to_entity_fqn = "" + from_entity_fqn = "" + + if from_entity_fqn and to_entity_fqn: + to_table_entity = self.metadata.get_by_name(entity=Table, fqn=to_entity_fqn) + from_table_entity = self.metadata.get_by_name( + entity=Table, fqn=from_entity_fqn + ) + for key, value in col_details.items(): + if value["enabled"] == True: + if from_table_entity and to_table_entity: + from_col = get_column_fqn( + table_entity=from_table_entity, column=key + ) + to_col = get_column_fqn( + table_entity=to_table_entity, + column=value.get("name_in_destination"), + ) + col_lineage_arr.append( + ColumnLineage( + toColumn=to_col, + fromColumns=[from_col], + function=None, + ) + ) + return col_lineage_arr if col_lineage_arr else [] + def yield_pipeline_lineage_details( self, pipeline_details: FivetranPipelineDetails ) -> Iterable[Either[AddLineageRequest]]: @@ -127,22 +169,43 @@ class FivetranSource(PipelineServiceSource): :param pipeline_details: pipeline_details object from airbyte :return: Lineage from inlets and outlets """ + self.client = cast(FivetranClient, self.client) source_service = self.metadata.get_by_name( entity=DatabaseService, fqn=pipeline_details.source.get("schema") ) + if not source_service: + es_resp: Union[ + List[DatabaseService], None + ] = self.metadata.es_search_from_fqn( + DatabaseService, pipeline_details.source.get("schema", "") + ) + if es_resp and len(es_resp) > 0 and es_resp[0].fullyQualifiedName: + source_service = self.metadata.get_by_name( + entity=DatabaseService, + fqn=(es_resp[0].fullyQualifiedName.root), + ) destination_service = self.metadata.get_by_name( entity=DatabaseService, fqn=pipeline_details.group.get("name") ) + if not source_service or not destination_service: return for schema, schema_data in self.client.get_connector_schema_details( connector_id=pipeline_details.source.get("id") ).items(): + for table in schema_data.get("tables", {}).keys(): + col_lineage_arr = self.fetch_column_lineage( + pipeline_details=pipeline_details, + schema=schema, + schema_data=schema_data, + table=table, + ) + from_fqn = fqn.build( - self.metadata, - Table, + metadata=self.metadata, + entity_type=Table, table_name=table, database_name=pipeline_details.source.get("config", {}).get( "database" @@ -179,30 +242,36 @@ class FivetranSource(PipelineServiceSource): lineage_details = LineageDetails( pipeline=EntityReference( id=pipeline_entity.id.root, type="pipeline" - ), + ), # type: ignore source=LineageSource.PipelineLineage, + columnsLineage=col_lineage_arr if col_lineage_arr else None, + sqlQuery=None, + description=None, ) yield Either( right=AddLineageRequest( edge=EntitiesEdge( - fromEntity=EntityReference(id=from_entity.id, type="table"), - toEntity=EntityReference(id=to_entity.id, type="table"), + fromEntity=EntityReference(id=from_entity.id, type="table"), # type: ignore + toEntity=EntityReference(id=to_entity.id, type="table"), # type: ignore lineageDetails=lineage_details, ) ) - ) + ) # type: ignore def get_pipelines_list(self) -> Iterable[FivetranPipelineDetails]: """Get List of all pipelines""" for group in self.client.list_groups(): - for connector in self.client.list_group_connectors( - group_id=group.get("id") - ): + destination_id: str = group.get("id", "") + for connector in self.client.list_group_connectors(group_id=destination_id): + connector_id: str = connector.get("id", "") yield FivetranPipelineDetails( - destination=self.client.get_destination_details(group.get("id")), - source=self.client.get_connector_details(connector.get("id")), + destination=self.client.get_destination_details( + destination_id=destination_id + ), + source=self.client.get_connector_details(connector_id=connector_id), group=group, + connector_id=connector_id, ) def get_pipeline_name(self, pipeline_details: FivetranPipelineDetails) -> str: diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py new file mode 100644 index 00000000000..ed7f86062a6 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel + + +class FivetranPipelineDetails(BaseModel): + """ + Wrapper Class to combine source & destination + """ + + source: dict + destination: dict + group: dict + connector_id: str + + @property + def pipeline_name(self): + return f'{self.group.get("id")}_{self.source.get("id")}' + + @property + def pipeline_display_name(self): + return f'{self.group.get("name")} <> {self.source.get("schema")}' diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 54d867ef5db..e890c480fcc 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -146,30 +146,6 @@ class OpenlineageSource(PipelineServiceSource): except FQNNotFoundException: return None - def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]: - """ - Based on partial schema and table names look for matching table object in open metadata. - :param schema: schema name - :param table: table name - :return: fully qualified name of a Table in Open Metadata - """ - result = None - services = self.get_db_service_names() - for db_service in services: - result = fqn.build( - metadata=self.metadata, - entity_type=Table, - service_name=db_service, - database_name=None, - schema_name=table_details.schema, - table_name=table_details.name, - ) - if not result: - raise FQNNotFoundException( - f"Table FQN not found for table: {table_details} within services: {services}" - ) - return result - def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]: """ Based on partial schema name look for any matching DatabaseSchema object in open metadata. diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index c6945fb5432..40449f08bc1 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -20,6 +20,7 @@ from typing_extensions import Annotated from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineService, @@ -46,6 +47,8 @@ from metadata.ingestion.models.topology import ( ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.ingestion.source.pipeline.openlineage.models import TableDetails +from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException from metadata.utils import fqn from metadata.utils.filters import filter_by_pipeline from metadata.utils.logger import ingestion_logger @@ -192,6 +195,30 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): else: yield lineage + def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]: + """ + Based on partial schema and table names look for matching table object in open metadata. + :param schema: schema name + :param table: table name + :return: fully qualified name of a Table in Open Metadata + """ + result = None + services = self.get_db_service_names() + for db_service in services: + result = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=db_service, + database_name=None, + schema_name=table_details.schema, + table_name=table_details.name, + ) + if result: + return result + raise FQNNotFoundException( + f"Table FQN not found for table: {table_details} within services: {services}" + ) + def yield_tag(self, *args, **kwargs) -> Iterable[Either[OMetaTagAndClassification]]: """Method to fetch pipeline tags""" diff --git a/ingestion/tests/unit/topology/pipeline/test_fivetran.py b/ingestion/tests/unit/topology/pipeline/test_fivetran.py index 0261dcb7540..be9491e0018 100644 --- a/ingestion/tests/unit/topology/pipeline/test_fivetran.py +++ b/ingestion/tests/unit/topology/pipeline/test_fivetran.py @@ -69,6 +69,7 @@ EXPECTED_FIVETRAN_DETAILS = FivetranPipelineDetails( source=mock_data.get("source"), destination=mock_data.get("destination"), group=mock_data.get("group"), + connector_id=mock_data.get("source").get("id"), ) @@ -79,6 +80,9 @@ EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( Task( name="wackiness_remote_aiding_pointless", displayName="test <> postgres_rds", + sourceUrl=SourceUrl( + "https://fivetran.com/dashboard/connectors/aiding_pointless/status?groupId=wackiness_remote&service=postgres_rds" + ), ) ], service=FullyQualifiedEntityName("fivetran_source"),