Minor: Fix Tableau Lineage in Multi Schema Model (#21965)

This commit is contained in:
Mayur Singal 2025-06-25 23:43:06 +05:30 committed by ulixius9
parent 47f570ea1b
commit 05c90802f7

View File

@ -11,6 +11,7 @@
""" """
Tableau source module Tableau source module
""" """
# pylint: disable=too-many-lines
import traceback import traceback
from datetime import datetime from datetime import datetime
from typing import Any, Iterable, List, Optional, Set from typing import Any, Iterable, List, Optional, Set
@ -62,7 +63,10 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.lineage.sql_lineage import (
get_column_fqn,
get_table_fqn_from_query_name,
)
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import ( from metadata.ingestion.source.dashboard.dashboard_service import (
@ -157,13 +161,13 @@ class TableauSource(DashboardServiceSource):
return None return None
@staticmethod @staticmethod
def _get_data_models_tags(dataModels: List[DataSource]) -> Set[str]: def _get_data_models_tags(data_models: List[DataSource]) -> Set[str]:
""" """
Get the tags from the data model in the upstreamDatasources Get the tags from the data model in the upstreamDatasources
""" """
tags = set() tags = set()
try: try:
for data_model in dataModels: for data_model in data_models:
# tags seems to be available for upstreamDatasources only, not for dataModels # tags seems to be available for upstreamDatasources only, not for dataModels
for upstream_source in data_model.upstreamDatasources or []: for upstream_source in data_model.upstreamDatasources or []:
for tag in upstream_source.tags: for tag in upstream_source.tags:
@ -191,7 +195,7 @@ class TableauSource(DashboardServiceSource):
_all_tags = tags.union(_data_models_tags) _all_tags = tags.union(_data_models_tags)
yield from get_ometa_tag_and_classification( yield from get_ometa_tag_and_classification(
tags=[tag for tag in _all_tags], tags=list(_all_tags),
classification_name=TABLEAU_TAG_CATEGORY, classification_name=TABLEAU_TAG_CATEGORY,
tag_description="Tableau Tag", tag_description="Tableau Tag",
classification_description="Tags associated with tableau entities", classification_description="Tags associated with tableau entities",
@ -371,6 +375,7 @@ class TableauSource(DashboardServiceSource):
columns.append(child_column.fullyQualifiedName.root) columns.append(child_column.fullyQualifiedName.root)
return columns return columns
# pylint: disable=arguments-differ
def _get_column_lineage( def _get_column_lineage(
self, self,
upstream_table: UpstreamTable, upstream_table: UpstreamTable,
@ -563,6 +568,7 @@ class TableauSource(DashboardServiceSource):
return datamodel_column_lineage or None return datamodel_column_lineage or None
# pylint: disable=too-many-locals,too-many-nested-blocks
def _get_datamodel_table_lineage( def _get_datamodel_table_lineage(
self, self,
datamodel: DataSource, datamodel: DataSource,
@ -645,7 +651,8 @@ class TableauSource(DashboardServiceSource):
) )
if not from_entities: if not from_entities:
logger.debug( logger.debug(
f"No table entities found for custom SQL lineage. fqn_search_string={fqn_search_string}, table_name={table_name}, query={query}" "No table entities found for custom SQL lineage."
f"fqn_search_string={fqn_search_string}, table_name={table_name}, query={query}"
) )
for table_entity in from_entities: for table_entity in from_entities:
yield self._get_add_lineage_request( yield self._get_add_lineage_request(
@ -818,8 +825,36 @@ class TableauSource(DashboardServiceSource):
) )
if table_entity: if table_entity:
return [TableAndQuery(table=table_entity)] return [TableAndQuery(table=table_entity)]
# fallback to fullName if schema name is not appropriate
# TODO: We should make this a primary approach for lineage instead
# using this strategy as a fallback.
if not table.fullName:
logger.debug(
"No table entity found for lineage using GraphQL APIs."
f"fqn_search_string={fqn_search_string}, table_name={table_name}"
)
return None
# fullname is in the format of [database].[schema].[table] or [database].[table] or [table]
database_query, schema_query, table_query = get_table_fqn_from_query_name(
table.fullName.replace("[", "").replace("]", "")
)
if table_query:
fqn_search_string = build_es_fqn_search_string(
database_name=database_query,
schema_name=schema_query,
service_name=db_service_name or "*",
table_name=table_query,
)
table_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if table_entity:
return [TableAndQuery(table=table_entity)]
logger.debug( logger.debug(
f"No table entity found for lineage using GraphQL APIs. fqn_search_string={fqn_search_string}, table_name={table_name}" "No table entity found for lineage using GraphQL APIs."
f"fqn_search_string={fqn_search_string}, table_name={table_name}"
) )
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
@ -876,7 +911,9 @@ class TableauSource(DashboardServiceSource):
) )
if not from_entities: if not from_entities:
logger.debug( logger.debug(
f"No table entities found for lineage using SQL Queries. fqn_search_string={fqn_search_string}, table_name={table_name}, query={custom_sql_table.query}" "No table entities found for lineage using SQL Queries."
f"fqn_search_string={fqn_search_string}, "
f"table_name={table_name}, query={custom_sql_table.query}"
) )
tables_list.extend( tables_list.extend(
[ [