diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py index 650c7362b74..b097b1e23d9 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/client.py @@ -28,6 +28,7 @@ from metadata.ingestion.source.dashboard.tableau import ( TABLEAU_GET_WORKBOOKS_PARAM_DICT, ) from metadata.ingestion.source.dashboard.tableau.models import ( + CustomSQLTablesResponse, DataSource, TableauChart, TableauDashboard, @@ -37,6 +38,7 @@ from metadata.ingestion.source.dashboard.tableau.models import ( ) from metadata.ingestion.source.dashboard.tableau.queries import ( TABLEAU_DATASOURCES_QUERY, + TALEAU_GET_CUSTOM_SQL_QUERY, ) from metadata.utils.logger import ometa_logger @@ -82,6 +84,7 @@ class TableauClient: self.config = config self._client.sign_in().json() self.pagination_limit = pagination_limit + self.custom_sql_table_queries: Dict[str, List[str]] = {} self.usage_metrics: Dict[str, int] = {} @cached_property @@ -327,5 +330,52 @@ class TableauClient: logger.warning("Unable to fetch Data Sources") return None + def get_custom_sql_table_queries(self, dashboard_id: str) -> Optional[List[str]]: + """ + Get custom SQL table queries for a specific dashboard/workbook ID + """ + logger.debug(f"Getting custom SQL table queries for dashboard {dashboard_id}") + + if dashboard_id in self.custom_sql_table_queries: + logger.debug(f"Found cached queries for dashboard {dashboard_id}") + return self.custom_sql_table_queries[dashboard_id] + + return None + + def cache_custom_sql_tables(self) -> None: + """ + Fetch all custom SQL tables and cache their queries by workbook ID + """ + try: + result = self._client.metadata_graphql_query( + query=TALEAU_GET_CUSTOM_SQL_QUERY + ) + if not result or not (response_json := result.json()): + logger.debug("No result returned from GraphQL query") + return + + response = CustomSQLTablesResponse(**response_json) + if not response.data: + logger.debug("No data found in GraphQL response") + return + + for tables in response.data.values(): + for table in tables: + if not (table.query and table.downstreamWorkbooks): + logger.debug( + f"Skipping table {table} - missing query or workbooks" + ) + continue + + query = table.query + for workbook in table.downstreamWorkbooks: + self.custom_sql_table_queries.setdefault( + workbook.luid, [] + ).append(query) + + except Exception: + logger.debug(traceback.format_exc()) + logger.warning("Unable to fetch Custom SQL Tables") + def sign_out(self) -> None: self._client.sign_out() diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index f8fe2c08bf5..d74de24b6a6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -128,6 +128,13 @@ class TableauSource(DashboardServiceSource): ) return cls(config, metadata) + def prepare(self): + """ + Prepare the source before ingestion + we will fetch the custom sql tables from the tableau server + """ + self.client.cache_custom_sql_tables() + def get_dashboards_list(self) -> Optional[List[TableauDashboard]]: return self.client.get_workbooks() @@ -145,6 +152,11 @@ class TableauSource(DashboardServiceSource): # Get the tableau data sources dashboard.dataModels = self.client.get_datasources(dashboard_id=dashboard.id) + # Get custom SQL queries + dashboard.custom_sql_queries = self.client.get_custom_sql_table_queries( + dashboard_id=dashboard.id + ) + return dashboard def get_owner_ref( @@ -646,6 +658,65 @@ class TableauSource(DashboardServiceSource): db_service_name=db_service_name, upstream_data_model_entity=data_model_entity, ) + + # Process custom SQL queries if available + if dashboard_details.custom_sql_queries: + for query in dashboard_details.custom_sql_queries: + try: + db_service_entity = None + if db_service_name: + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) + lineage_parser = LineageParser( + query, + ConnectionTypeDialectMapper.dialect_of( + db_service_entity.serviceType.value + ) + if db_service_entity + else Dialect.ANSI, + ) + for source_table in lineage_parser.source_tables or []: + database_schema_table = fqn.split_table_name( + str(source_table) + ) + database_name = database_schema_table.get("database") + if db_service_entity: + if isinstance( + db_service_entity.connection.config, + BigQueryConnection, + ): + database_name = None + database_name = get_database_name_for_lineage( + db_service_entity, database_name + ) + schema_name = self.check_database_schema_name( + database_schema_table.get("database_schema") + ) + table_name = database_schema_table.get("table") + fqn_search_string = build_es_fqn_search_string( + database_name=database_name, + schema_name=schema_name, + service_name=db_service_name or "*", + table_name=table_name, + ) + from_entities = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + fetch_multiple_entities=True, + ) + for table_entity in from_entities: + yield self._get_add_lineage_request( + to_entity=data_model_entity, + from_entity=table_entity, + sql=query, + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error( + f"Error processing custom SQL query lineage: {err}" + ) + except Exception as err: yield Either( left=StackTraceError( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py index 307b74ccb32..9c56ce9e673 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py @@ -84,15 +84,33 @@ def transform_tags(raw: Union[Dict[str, Any], List[TableauTag]]) -> List[Tableau return tags +class TableauWorkbook(BaseModel): + """ + Model for downstream workbook information + """ + + luid: Optional[str] = None + name: Optional[str] = None + + class CustomSQLTable(TableauBaseModel): """ GraphQL API CustomSQLTable schema https://help.tableau.com/current/api/metadata_api/en-us/reference/customsqltable.doc.html """ + downstreamWorkbooks: Optional[List[TableauWorkbook]] = None query: Optional[str] = None +class CustomSQLTablesResponse(BaseModel): + """ + Model for the custom SQL tables response + """ + + data: Dict[str, List[CustomSQLTable]] + + class UpstreamColumn(BaseModel): id: str name: Optional[str] = None @@ -173,6 +191,7 @@ class TableauDashboard(TableauBaseModel): webpageUrl: Optional[str] = None charts: Optional[List[TableauChart]] = None dataModels: List[DataSource] = [] + custom_sql_queries: Optional[List[str]] = None class TableAndQuery(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py index bb03818524e..b9977c9d270 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py @@ -77,3 +77,17 @@ workbooks(filter:{{luid: "{workbook_id}"}}){{ }} }} """ + +TALEAU_GET_CUSTOM_SQL_QUERY = """ +{ + customSQLTables { + name + id + downstreamWorkbooks { + luid + name + } + query + } +} +"""