MINOR: Tableau Capture SQL in Lineage (#18925)

This commit is contained in:
Mayur Singal 2024-12-05 09:55:21 +05:30 committed by GitHub
parent 6326ae3cb2
commit 8ac80e6807
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 9 deletions

View File

@ -479,6 +479,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
to_entity: Union[Dashboard, DashboardDataModel], to_entity: Union[Dashboard, DashboardDataModel],
from_entity: Union[Table, DashboardDataModel, Dashboard], from_entity: Union[Table, DashboardDataModel, Dashboard],
column_lineage: List[ColumnLineage] = None, column_lineage: List[ColumnLineage] = None,
sql: Optional[str] = None,
) -> Optional[Either[AddLineageRequest]]: ) -> Optional[Either[AddLineageRequest]]:
if from_entity and to_entity: if from_entity and to_entity:
return Either( return Either(
@ -494,6 +495,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
), ),
lineageDetails=LineageDetails( lineageDetails=LineageDetails(
source=LineageSource.DashboardLineage, source=LineageSource.DashboardLineage,
sqlQuery=sql,
columnsLineage=column_lineage, columnsLineage=column_lineage,
), ),
) )

View File

@ -69,6 +69,7 @@ from metadata.ingestion.source.dashboard.tableau.models import (
ChartUrl, ChartUrl,
DataSource, DataSource,
DatasourceField, DatasourceField,
TableAndQuery,
TableauDashboard, TableauDashboard,
TableauTag, TableauTag,
UpstreamTable, UpstreamTable,
@ -149,7 +150,7 @@ class TableauSource(DashboardServiceSource):
return None return None
@staticmethod @staticmethod
def _get_data_models_tags(dataModels: [DataSource]) -> Set[str]: def _get_data_models_tags(dataModels: List[DataSource]) -> Set[str]:
""" """
Get the tags from the data model in the upstreamDatasources Get the tags from the data model in the upstreamDatasources
""" """
@ -454,14 +455,18 @@ class TableauSource(DashboardServiceSource):
} }
for table in datamodel.upstreamTables or []: for table in datamodel.upstreamTables or []:
om_tables = self._get_database_tables(db_service_entity, table) om_tables = self._get_database_tables(db_service_entity, table)
for om_table in om_tables or []: for om_table_and_query in om_tables or []:
column_lineage = self._get_column_lineage( column_lineage = self._get_column_lineage(
table, om_table, upstream_data_model_entity, upstream_col_set table,
om_table_and_query.table,
upstream_data_model_entity,
upstream_col_set,
) )
yield self._get_add_lineage_request( yield self._get_add_lineage_request(
to_entity=upstream_data_model_entity, to_entity=upstream_data_model_entity,
from_entity=om_table, from_entity=om_table_and_query.table,
column_lineage=column_lineage, column_lineage=column_lineage,
sql=om_table_and_query.query,
) )
except Exception as err: except Exception as err:
yield Either( yield Either(
@ -698,7 +703,7 @@ class TableauSource(DashboardServiceSource):
def _get_table_entities_from_api( def _get_table_entities_from_api(
self, db_service_entity: DatabaseService, table: UpstreamTable self, db_service_entity: DatabaseService, table: UpstreamTable
) -> Optional[List[Table]]: ) -> Optional[List[TableAndQuery]]:
""" """
In case we get the table details from the Graphql APIs we process them In case we get the table details from the Graphql APIs we process them
""" """
@ -734,7 +739,7 @@ class TableauSource(DashboardServiceSource):
fqn=table_fqn, fqn=table_fqn,
) )
if table_entity: if table_entity:
return [table_entity] return [TableAndQuery(table=table_entity)]
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Error to get tables for lineage using GraphQL Apis: {exc}") logger.warning(f"Error to get tables for lineage using GraphQL Apis: {exc}")
@ -742,7 +747,7 @@ class TableauSource(DashboardServiceSource):
def _get_table_entities_from_query( def _get_table_entities_from_query(
self, db_service_entity: DatabaseService, table: UpstreamTable self, db_service_entity: DatabaseService, table: UpstreamTable
) -> Optional[List[Table]]: ) -> Optional[List[TableAndQuery]]:
""" """
In case we get the table details from the Graphql APIs we process them In case we get the table details from the Graphql APIs we process them
""" """
@ -778,7 +783,12 @@ class TableauSource(DashboardServiceSource):
database_schema=schema_name, database_schema=schema_name,
table=table_name, table=table_name,
) )
tables_list.extend(from_entities) tables_list.extend(
[
TableAndQuery(table=table, query=custom_sql_table.query)
for table in from_entities
]
)
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
@ -787,7 +797,7 @@ class TableauSource(DashboardServiceSource):
def _get_database_tables( def _get_database_tables(
self, db_service_entity: DatabaseService, table: UpstreamTable self, db_service_entity: DatabaseService, table: UpstreamTable
) -> Optional[List[Table]]: ) -> Optional[List[TableAndQuery]]:
""" """
Get the table entities for lineage Get the table entities for lineage
""" """

View File

@ -18,6 +18,7 @@ from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, validator from pydantic import BaseModel, ConfigDict, Field, validator
from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.table import Table
class TableauBaseModel(BaseModel): class TableauBaseModel(BaseModel):
@ -172,3 +173,12 @@ class TableauDashboard(TableauBaseModel):
webpageUrl: Optional[str] = None webpageUrl: Optional[str] = None
charts: Optional[List[TableauChart]] = None charts: Optional[List[TableauChart]] = None
dataModels: List[DataSource] = [] dataModels: List[DataSource] = []
class TableAndQuery(BaseModel):
"""
Wrapper class for Table entity and associated Query for lineage
"""
table: Table
query: Optional[str] = None