diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index dd3963cd1ca..5c72218df84 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -11,6 +11,7 @@ """ Tableau source module """ +import json import traceback from typing import Iterable, List, Optional @@ -58,6 +59,27 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() TABLEAU_TAG_CATEGORY = "TableauTags" +TABLEAU_LINEAGE_GRAPHQL_QUERY = """ +{ + workbooks { + id + luid + name + upstreamTables{ + name + schema + upstreamDatabases{ + name + } + referencedByQueries{ + name + query + } + } + } +} +""" + class TableauSource(DashboardServiceSource): @@ -75,6 +97,7 @@ class TableauSource(DashboardServiceSource): self.workbooks = {} self.tags = [] self.owner = {} + self.workboook_datasources = {} def prepare(self): # Restructuring the api response for workbooks @@ -109,6 +132,14 @@ class TableauSource(DashboardServiceSource): owner = get_all_user_fields(self.client) self.owner = {user["id"]: user for user in owner} + # Fetch Datasource information for lineage + graphql_query_result = self.client.metadata_graphql_query( + query=TABLEAU_LINEAGE_GRAPHQL_QUERY + ) + self.workboook_datasources = json.loads(graphql_query_result.text)["data"].get( + "workbooks" + ) + return super().prepare() @classmethod @@ -223,38 +254,43 @@ class TableauSource(DashboardServiceSource): """ Get lineage between dashboard and data sources """ - datasource_list = ( - get_workbook_connections_dataframe(self.client, dashboard_details.get("id")) - .get("datasource_name") - .tolist() - ) - dashboard_name = dashboard_details.get("name") + dashboard_id = dashboard_details.get("id") + data_source = next( + ( + data_source + for data_source in self.workboook_datasources + if data_source.get("luid") == dashboard_id + ), + None, + ) to_fqn = fqn.build( self.metadata, entity_type=LineageDashboard, service_name=self.config.serviceName, - dashboard_name=dashboard_name, + dashboard_name=dashboard_id, ) to_entity = self.metadata.get_by_name( entity=LineageDashboard, fqn=to_fqn, ) - for datasource in datasource_list: - try: - schema_and_table_name = ( - datasource.split("(")[1].split(")")[0].split(".") - ) - schema_name = schema_and_table_name[0] - table_name = schema_and_table_name[1] + try: + upstream_tables = data_source.get("upstreamTables") + for upstream_table in upstream_tables: + database_schema_table = fqn.split_table_name(upstream_table.get("name")) + database_name = database_schema_table.get("database") + schema_name = database_schema_table.get("database_schema") + table_name = database_schema_table.get("table") from_fqn = fqn.build( self.metadata, entity_type=Table, service_name=db_service_name, - schema_name=schema_name, + schema_name=schema_name + if schema_name + else upstream_table.get("schema"), table_name=table_name, - database_name=None, + database_name=database_name, ) from_entity = self.metadata.get_by_name( entity=Table, @@ -272,11 +308,11 @@ class TableauSource(DashboardServiceSource): ) ) yield lineage - except (Exception, IndexError) as err: - logger.debug(traceback.format_exc()) - logger.error( - f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {err}" - ) + except (Exception, IndexError) as err: + logger.debug(traceback.format_exc()) + logger.error( + f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {err}" + ) def yield_dashboard_chart( self, dashboard_details: dict