Fix: Added Tableau Customsql lineage (#20317)

This commit is contained in:
Suman Maharana 2025-03-19 09:28:39 +05:30 committed by GitHub
parent b094bf43aa
commit a2057077ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 0 deletions

View File

@ -28,6 +28,7 @@ from metadata.ingestion.source.dashboard.tableau import (
TABLEAU_GET_WORKBOOKS_PARAM_DICT,
)
from metadata.ingestion.source.dashboard.tableau.models import (
CustomSQLTablesResponse,
DataSource,
TableauChart,
TableauDashboard,
@ -37,6 +38,7 @@ from metadata.ingestion.source.dashboard.tableau.models import (
)
from metadata.ingestion.source.dashboard.tableau.queries import (
TABLEAU_DATASOURCES_QUERY,
TALEAU_GET_CUSTOM_SQL_QUERY,
)
from metadata.utils.logger import ometa_logger
@ -82,6 +84,7 @@ class TableauClient:
self.config = config
self._client.sign_in().json()
self.pagination_limit = pagination_limit
self.custom_sql_table_queries: Dict[str, List[str]] = {}
self.usage_metrics: Dict[str, int] = {}
@cached_property
@ -327,5 +330,52 @@ class TableauClient:
logger.warning("Unable to fetch Data Sources")
return None
def get_custom_sql_table_queries(self, dashboard_id: str) -> Optional[List[str]]:
"""
Get custom SQL table queries for a specific dashboard/workbook ID
"""
logger.debug(f"Getting custom SQL table queries for dashboard {dashboard_id}")
if dashboard_id in self.custom_sql_table_queries:
logger.debug(f"Found cached queries for dashboard {dashboard_id}")
return self.custom_sql_table_queries[dashboard_id]
return None
def cache_custom_sql_tables(self) -> None:
"""
Fetch all custom SQL tables and cache their queries by workbook ID
"""
try:
result = self._client.metadata_graphql_query(
query=TALEAU_GET_CUSTOM_SQL_QUERY
)
if not result or not (response_json := result.json()):
logger.debug("No result returned from GraphQL query")
return
response = CustomSQLTablesResponse(**response_json)
if not response.data:
logger.debug("No data found in GraphQL response")
return
for tables in response.data.values():
for table in tables:
if not (table.query and table.downstreamWorkbooks):
logger.debug(
f"Skipping table {table} - missing query or workbooks"
)
continue
query = table.query
for workbook in table.downstreamWorkbooks:
self.custom_sql_table_queries.setdefault(
workbook.luid, []
).append(query)
except Exception:
logger.debug(traceback.format_exc())
logger.warning("Unable to fetch Custom SQL Tables")
def sign_out(self) -> None:
self._client.sign_out()

View File

@ -128,6 +128,13 @@ class TableauSource(DashboardServiceSource):
)
return cls(config, metadata)
def prepare(self):
"""
Prepare the source before ingestion
we will fetch the custom sql tables from the tableau server
"""
self.client.cache_custom_sql_tables()
def get_dashboards_list(self) -> Optional[List[TableauDashboard]]:
return self.client.get_workbooks()
@ -145,6 +152,11 @@ class TableauSource(DashboardServiceSource):
# Get the tableau data sources
dashboard.dataModels = self.client.get_datasources(dashboard_id=dashboard.id)
# Get custom SQL queries
dashboard.custom_sql_queries = self.client.get_custom_sql_table_queries(
dashboard_id=dashboard.id
)
return dashboard
def get_owner_ref(
@ -646,6 +658,65 @@ class TableauSource(DashboardServiceSource):
db_service_name=db_service_name,
upstream_data_model_entity=data_model_entity,
)
# Process custom SQL queries if available
if dashboard_details.custom_sql_queries:
for query in dashboard_details.custom_sql_queries:
try:
db_service_entity = None
if db_service_name:
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
lineage_parser = LineageParser(
query,
ConnectionTypeDialectMapper.dialect_of(
db_service_entity.serviceType.value
)
if db_service_entity
else Dialect.ANSI,
)
for source_table in lineage_parser.source_tables or []:
database_schema_table = fqn.split_table_name(
str(source_table)
)
database_name = database_schema_table.get("database")
if db_service_entity:
if isinstance(
db_service_entity.connection.config,
BigQueryConnection,
):
database_name = None
database_name = get_database_name_for_lineage(
db_service_entity, database_name
)
schema_name = self.check_database_schema_name(
database_schema_table.get("database_schema")
)
table_name = database_schema_table.get("table")
fqn_search_string = build_es_fqn_search_string(
database_name=database_name,
schema_name=schema_name,
service_name=db_service_name or "*",
table_name=table_name,
)
from_entities = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
fetch_multiple_entities=True,
)
for table_entity in from_entities:
yield self._get_add_lineage_request(
to_entity=data_model_entity,
from_entity=table_entity,
sql=query,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error processing custom SQL query lineage: {err}"
)
except Exception as err:
yield Either(
left=StackTraceError(

View File

@ -84,15 +84,33 @@ def transform_tags(raw: Union[Dict[str, Any], List[TableauTag]]) -> List[Tableau
return tags
class TableauWorkbook(BaseModel):
"""
Model for downstream workbook information
"""
luid: Optional[str] = None
name: Optional[str] = None
class CustomSQLTable(TableauBaseModel):
"""
GraphQL API CustomSQLTable schema
https://help.tableau.com/current/api/metadata_api/en-us/reference/customsqltable.doc.html
"""
downstreamWorkbooks: Optional[List[TableauWorkbook]] = None
query: Optional[str] = None
class CustomSQLTablesResponse(BaseModel):
"""
Model for the custom SQL tables response
"""
data: Dict[str, List[CustomSQLTable]]
class UpstreamColumn(BaseModel):
id: str
name: Optional[str] = None
@ -173,6 +191,7 @@ class TableauDashboard(TableauBaseModel):
webpageUrl: Optional[str] = None
charts: Optional[List[TableauChart]] = None
dataModels: List[DataSource] = []
custom_sql_queries: Optional[List[str]] = None
class TableAndQuery(BaseModel):

View File

@ -77,3 +77,17 @@ workbooks(filter:{{luid: "{workbook_id}"}}){{
}}
}}
"""
TALEAU_GET_CUSTOM_SQL_QUERY = """
{
customSQLTables {
name
id
downstreamWorkbooks {
luid
name
}
query
}
}
"""