diff --git a/ingestion/src/metadata/ingestion/source/tableau.py b/ingestion/src/metadata/ingestion/source/tableau.py index ee488747809..683c344c791 100644 --- a/ingestion/src/metadata/ingestion/source/tableau.py +++ b/ingestion/src/metadata/ingestion/source/tableau.py @@ -19,11 +19,21 @@ from typing import Iterable, List, Optional import dateutil.parser as dateparser from pydantic import SecretStr from tableau_api_lib import TableauServerConnection -from tableau_api_lib.utils.querying import get_views_dataframe, get_workbooks_dataframe +from tableau_api_lib.utils.querying import ( + get_views_dataframe, + get_workbook_connections_dataframe, + get_workbooks_dataframe, +) +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.dashboard import ( + Dashboard as Dashboard_Entity, +) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import ( ConfigModel, @@ -33,6 +43,7 @@ from metadata.ingestion.api.common import ( ) from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.utils.helpers import get_dashboard_service_or_create @@ -49,6 +60,7 @@ class TableauSourceConfig(ConfigModel): env: Optional[str] = "tableau_prod" site_name: str site_url: str + db_service_name: Optional[str] = None service_name: str service_type: str = DashboardServiceType.Tableau.value personal_access_token_name: Optional[str] = None @@ -97,6 +109,7 @@ class TableauSource(Source[Entity]): metadata_config=metadata_config, ) self.status = SourceStatus() + self.metadata_client = OpenMetadata(self.metadata_config) self.dashboards = get_workbooks_dataframe(self.client).to_dict() self.all_dashboard_details = get_views_dataframe(self.client).to_dict() @@ -172,12 +185,44 @@ class TableauSource(Source[Entity]): ) ] + def get_lineage(self, datasource_list, dashboard_name) -> AddLineageRequest: + for datasource in datasource_list: + try: + table_fqdn = datasource.split("(")[1].split(")")[0] + dashboard_fqdn = f"{self.config.service_name}.{dashboard_name}" + table_fqdn = f"{self.config.db_service_name}.{table_fqdn}" + table_entity = self.metadata_client.get_by_name( + entity=Table, fqdn=table_fqdn + ) + dashboard_entity = self.metadata_client.get_by_name( + entity=Dashboard_Entity, fqdn=dashboard_fqdn + ) + if table_entity and dashboard_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=table_entity.id.__root__, type="table" + ), + toEntity=EntityReference( + id=dashboard_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage + except (Exception, IndexError) as err: + logger.error(err) + def _get_tableau_dashboard(self) -> Dashboard: for index in range(len(self.dashboards["id"])): dashboard_id = self.dashboards["id"][index] dashboard_name = self.dashboards["name"][index] dashboard_tag = self.dashboards["tags"][index] dashboard_url = self.dashboards["webpageUrl"][index] + datasource_list = ( + get_workbook_connections_dataframe(self.client, dashboard_id) + .get("datasource_name") + .tolist() + ) tag_labels = [] if hasattr(dashboard_tag, "tag"): for tag in dashboard_tag["tag"]: @@ -202,6 +247,8 @@ class TableauSource(Source[Entity]): service=EntityReference(id=self.service.id, type="dashboardService"), last_modified=dateparser.parse(chart["updatedAt"]).timestamp() * 1000, ) + if self.config.db_service_name: + yield from self.get_lineage(datasource_list, dashboard_id) def _get_tableau_charts(self): for index in range(len(self.all_dashboard_details["id"])):