Fixes #14945: FiveTran Improvements (#17169)

This commit is contained in:
Ayush Shah 2024-07-30 10:16:47 +05:30 committed by GitHub
parent be1d5a2a31
commit 34c572334e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 182 additions and 73 deletions

View File

@ -12,7 +12,9 @@
Client to interact with fivetran apis Client to interact with fivetran apis
""" """
import base64 import base64
from typing import List from typing import List, Optional
from requests import Response
from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import ( from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import (
FivetranConnection, FivetranConnection,
@ -44,8 +46,8 @@ class FivetranClient:
def run_paginator(self, path: str) -> List[dict]: def run_paginator(self, path: str) -> List[dict]:
response = self.client.get(f"{path}?limit={self.config.limit}") response = self.client.get(f"{path}?limit={self.config.limit}")
data = response["data"] data = response.get("data")
result = data["items"] result = data.get("items")
while data.get("next_cursor"): while data.get("next_cursor"):
response = self.client.get( response = self.client.get(
f"{path}?limit={self.config.limit}&cursor={data['next_cursor']}" f"{path}?limit={self.config.limit}&cursor={data['next_cursor']}"
@ -78,11 +80,22 @@ class FivetranClient:
Method returns destination details Method returns destination details
""" """
response = self.client.get(f"/destinations/{destination_id}") response = self.client.get(f"/destinations/{destination_id}")
return response["data"] return response.get("data")
def get_connector_schema_details(self, connector_id: str) -> dict: def get_connector_schema_details(self, connector_id: str) -> dict:
""" """
Method returns destination details Method returns destination details
""" """
response = self.client.get(f"/connectors/{connector_id}/schemas") response = self.client.get(f"/connectors/{connector_id}/schemas")
return response["data"]["schemas"] return response.get("data", {}).get("schemas", [])
def get_connector_column_lineage(
self, connector_id: str, schema_name: str, table_name: str
) -> dict:
"""
Method returns destination details
"""
response: Optional[Response] = self.client.get(
f"/connectors/{connector_id}/schemas/{schema_name}/tables/{table_name}/columns"
)
return response.get("data", {}).get("columns", [])

View File

@ -13,9 +13,7 @@ Airbyte source to extract metadata
""" """
import traceback import traceback
from typing import Iterable, Optional from typing import Iterable, List, Optional, Union, cast
from pydantic import BaseModel
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -33,13 +31,22 @@ from metadata.generated.schema.type.basic import (
FullyQualifiedEntityName, FullyQualifiedEntityName,
SourceUrl, SourceUrl,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.fivetran.client import FivetranClient
from metadata.ingestion.source.pipeline.fivetran.models import FivetranPipelineDetails
from metadata.ingestion.source.pipeline.openlineage.models import TableDetails
from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -47,24 +54,6 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
class FivetranPipelineDetails(BaseModel):
"""
Wrapper Class to combine source & destination
"""
source: dict
destination: dict
group: dict
@property
def pipeline_name(self):
return f'{self.group.get("id")}_{self.source.get("id")}'
@property
def pipeline_display_name(self):
return f'{self.group.get("name")} <> {self.source.get("schema")}'
class FivetranSource(PipelineServiceSource): class FivetranSource(PipelineServiceSource):
""" """
Implements the necessary methods ot extract Implements the necessary methods ot extract
@ -83,13 +72,18 @@ class FivetranSource(PipelineServiceSource):
) )
return cls(config, metadata) return cls(config, metadata)
def get_connections_jobs(self, pipeline_details: FivetranPipelineDetails): def get_connections_jobs(
self,
pipeline_details: FivetranPipelineDetails,
source_url: Optional[SourceUrl] = None,
) -> List[Task]:
"""Returns the list of tasks linked to connection""" """Returns the list of tasks linked to connection"""
return [ return [
Task( Task(
name=pipeline_details.pipeline_name, name=pipeline_details.pipeline_name,
displayName=pipeline_details.pipeline_display_name, displayName=pipeline_details.pipeline_display_name,
) sourceUrl=source_url,
) # type: ignore
] ]
def yield_pipeline( def yield_pipeline(
@ -100,25 +94,73 @@ class FivetranSource(PipelineServiceSource):
:param pipeline_details: pipeline_details object from fivetran :param pipeline_details: pipeline_details object from fivetran
:return: Create Pipeline request with tasks :return: Create Pipeline request with tasks
""" """
pipeline_request = CreatePipelineRequest( source_url = self.get_source_url(
name=EntityName(pipeline_details.pipeline_name),
displayName=pipeline_details.pipeline_display_name,
tasks=self.get_connections_jobs(pipeline_details),
service=FullyQualifiedEntityName(self.context.get().pipeline_service),
sourceUrl=self.get_source_url(
connector_id=pipeline_details.source.get("id"), connector_id=pipeline_details.source.get("id"),
group_id=pipeline_details.group.get("id"), group_id=pipeline_details.group.get("id"),
source_name=pipeline_details.source.get("service"), source_name=pipeline_details.source.get("service"),
),
) )
yield Either(right=pipeline_request) pipeline_request = CreatePipelineRequest(
name=EntityName(pipeline_details.pipeline_name),
displayName=pipeline_details.pipeline_display_name,
tasks=self.get_connections_jobs(
pipeline_details=pipeline_details, source_url=source_url
),
service=FullyQualifiedEntityName(self.context.get().pipeline_service),
sourceUrl=source_url,
) # type: ignore
yield Either(left=None, right=pipeline_request)
self.register_record(pipeline_request=pipeline_request) self.register_record(pipeline_request=pipeline_request)
def yield_pipeline_status( def yield_pipeline_status(
self, pipeline_details: FivetranPipelineDetails self, pipeline_details: FivetranPipelineDetails
) -> Iterable[Either[OMetaPipelineStatus]]: ) -> Optional[Iterable[Either[OMetaPipelineStatus]]]:
"""Method to get task & pipeline status""" """Method to get task & pipeline status"""
def fetch_column_lineage(
self, pipeline_details: FivetranPipelineDetails, schema, schema_data, table
) -> List[Optional[ColumnLineage]]:
col_details = self.client.get_connector_column_lineage(
pipeline_details.connector_id, schema_name=schema, table_name=table
)
col_lineage_arr = []
try:
from_entity_fqn: Optional[str] = self._get_table_fqn_from_om(
table_details=TableDetails(schema=schema, name=table)
)
to_entity_fqn: Optional[str] = self._get_table_fqn_from_om(
table_details=TableDetails(
schema=schema_data.get("name_in_destination"),
name=schema_data["tables"][table]["name_in_destination"],
)
)
except FQNNotFoundException:
to_entity_fqn = ""
from_entity_fqn = ""
if from_entity_fqn and to_entity_fqn:
to_table_entity = self.metadata.get_by_name(entity=Table, fqn=to_entity_fqn)
from_table_entity = self.metadata.get_by_name(
entity=Table, fqn=from_entity_fqn
)
for key, value in col_details.items():
if value["enabled"] == True:
if from_table_entity and to_table_entity:
from_col = get_column_fqn(
table_entity=from_table_entity, column=key
)
to_col = get_column_fqn(
table_entity=to_table_entity,
column=value.get("name_in_destination"),
)
col_lineage_arr.append(
ColumnLineage(
toColumn=to_col,
fromColumns=[from_col],
function=None,
)
)
return col_lineage_arr if col_lineage_arr else []
def yield_pipeline_lineage_details( def yield_pipeline_lineage_details(
self, pipeline_details: FivetranPipelineDetails self, pipeline_details: FivetranPipelineDetails
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
@ -127,22 +169,43 @@ class FivetranSource(PipelineServiceSource):
:param pipeline_details: pipeline_details object from airbyte :param pipeline_details: pipeline_details object from airbyte
:return: Lineage from inlets and outlets :return: Lineage from inlets and outlets
""" """
self.client = cast(FivetranClient, self.client)
source_service = self.metadata.get_by_name( source_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=pipeline_details.source.get("schema") entity=DatabaseService, fqn=pipeline_details.source.get("schema")
) )
if not source_service:
es_resp: Union[
List[DatabaseService], None
] = self.metadata.es_search_from_fqn(
DatabaseService, pipeline_details.source.get("schema", "")
)
if es_resp and len(es_resp) > 0 and es_resp[0].fullyQualifiedName:
source_service = self.metadata.get_by_name(
entity=DatabaseService,
fqn=(es_resp[0].fullyQualifiedName.root),
)
destination_service = self.metadata.get_by_name( destination_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=pipeline_details.group.get("name") entity=DatabaseService, fqn=pipeline_details.group.get("name")
) )
if not source_service or not destination_service: if not source_service or not destination_service:
return return
for schema, schema_data in self.client.get_connector_schema_details( for schema, schema_data in self.client.get_connector_schema_details(
connector_id=pipeline_details.source.get("id") connector_id=pipeline_details.source.get("id")
).items(): ).items():
for table in schema_data.get("tables", {}).keys(): for table in schema_data.get("tables", {}).keys():
col_lineage_arr = self.fetch_column_lineage(
pipeline_details=pipeline_details,
schema=schema,
schema_data=schema_data,
table=table,
)
from_fqn = fqn.build( from_fqn = fqn.build(
self.metadata, metadata=self.metadata,
Table, entity_type=Table,
table_name=table, table_name=table,
database_name=pipeline_details.source.get("config", {}).get( database_name=pipeline_details.source.get("config", {}).get(
"database" "database"
@ -179,30 +242,36 @@ class FivetranSource(PipelineServiceSource):
lineage_details = LineageDetails( lineage_details = LineageDetails(
pipeline=EntityReference( pipeline=EntityReference(
id=pipeline_entity.id.root, type="pipeline" id=pipeline_entity.id.root, type="pipeline"
), ), # type: ignore
source=LineageSource.PipelineLineage, source=LineageSource.PipelineLineage,
columnsLineage=col_lineage_arr if col_lineage_arr else None,
sqlQuery=None,
description=None,
) )
yield Either( yield Either(
right=AddLineageRequest( right=AddLineageRequest(
edge=EntitiesEdge( edge=EntitiesEdge(
fromEntity=EntityReference(id=from_entity.id, type="table"), fromEntity=EntityReference(id=from_entity.id, type="table"), # type: ignore
toEntity=EntityReference(id=to_entity.id, type="table"), toEntity=EntityReference(id=to_entity.id, type="table"), # type: ignore
lineageDetails=lineage_details, lineageDetails=lineage_details,
) )
) )
) ) # type: ignore
def get_pipelines_list(self) -> Iterable[FivetranPipelineDetails]: def get_pipelines_list(self) -> Iterable[FivetranPipelineDetails]:
"""Get List of all pipelines""" """Get List of all pipelines"""
for group in self.client.list_groups(): for group in self.client.list_groups():
for connector in self.client.list_group_connectors( destination_id: str = group.get("id", "")
group_id=group.get("id") for connector in self.client.list_group_connectors(group_id=destination_id):
): connector_id: str = connector.get("id", "")
yield FivetranPipelineDetails( yield FivetranPipelineDetails(
destination=self.client.get_destination_details(group.get("id")), destination=self.client.get_destination_details(
source=self.client.get_connector_details(connector.get("id")), destination_id=destination_id
),
source=self.client.get_connector_details(connector_id=connector_id),
group=group, group=group,
connector_id=connector_id,
) )
def get_pipeline_name(self, pipeline_details: FivetranPipelineDetails) -> str: def get_pipeline_name(self, pipeline_details: FivetranPipelineDetails) -> str:

View File

@ -0,0 +1,20 @@
from pydantic import BaseModel
class FivetranPipelineDetails(BaseModel):
"""
Wrapper Class to combine source & destination
"""
source: dict
destination: dict
group: dict
connector_id: str
@property
def pipeline_name(self):
return f'{self.group.get("id")}_{self.source.get("id")}'
@property
def pipeline_display_name(self):
return f'{self.group.get("name")} <> {self.source.get("schema")}'

View File

@ -146,30 +146,6 @@ class OpenlineageSource(PipelineServiceSource):
except FQNNotFoundException: except FQNNotFoundException:
return None return None
def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]:
"""
Based on partial schema and table names look for matching table object in open metadata.
:param schema: schema name
:param table: table name
:return: fully qualified name of a Table in Open Metadata
"""
result = None
services = self.get_db_service_names()
for db_service in services:
result = fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=db_service,
database_name=None,
schema_name=table_details.schema,
table_name=table_details.name,
)
if not result:
raise FQNNotFoundException(
f"Table FQN not found for table: {table_details} within services: {services}"
)
return result
def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]: def _get_schema_fqn_from_om(self, schema: str) -> Optional[str]:
""" """
Based on partial schema name look for any matching DatabaseSchema object in open metadata. Based on partial schema name look for any matching DatabaseSchema object in open metadata.

View File

@ -20,6 +20,7 @@ from typing_extensions import Annotated
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.pipelineService import ( from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection, PipelineConnection,
PipelineService, PipelineService,
@ -46,6 +47,8 @@ from metadata.ingestion.models.topology import (
) )
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.ingestion.source.pipeline.openlineage.models import TableDetails
from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.filters import filter_by_pipeline from metadata.utils.filters import filter_by_pipeline
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -192,6 +195,30 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
else: else:
yield lineage yield lineage
def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]:
"""
Based on partial schema and table names look for matching table object in open metadata.
:param schema: schema name
:param table: table name
:return: fully qualified name of a Table in Open Metadata
"""
result = None
services = self.get_db_service_names()
for db_service in services:
result = fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name=db_service,
database_name=None,
schema_name=table_details.schema,
table_name=table_details.name,
)
if result:
return result
raise FQNNotFoundException(
f"Table FQN not found for table: {table_details} within services: {services}"
)
def yield_tag(self, *args, **kwargs) -> Iterable[Either[OMetaTagAndClassification]]: def yield_tag(self, *args, **kwargs) -> Iterable[Either[OMetaTagAndClassification]]:
"""Method to fetch pipeline tags""" """Method to fetch pipeline tags"""

View File

@ -69,6 +69,7 @@ EXPECTED_FIVETRAN_DETAILS = FivetranPipelineDetails(
source=mock_data.get("source"), source=mock_data.get("source"),
destination=mock_data.get("destination"), destination=mock_data.get("destination"),
group=mock_data.get("group"), group=mock_data.get("group"),
connector_id=mock_data.get("source").get("id"),
) )
@ -79,6 +80,9 @@ EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
Task( Task(
name="wackiness_remote_aiding_pointless", name="wackiness_remote_aiding_pointless",
displayName="test <> postgres_rds", displayName="test <> postgres_rds",
sourceUrl=SourceUrl(
"https://fivetran.com/dashboard/connectors/aiding_pointless/status?groupId=wackiness_remote&service=postgres_rds"
),
) )
], ],
service=FullyQualifiedEntityName("fivetran_source"), service=FullyQualifiedEntityName("fivetran_source"),