unify tableau datamodels (#17185)

This commit is contained in:
Onkar Ravgan 2024-07-29 10:32:29 +05:30 committed by GitHub
parent b6745d7cf1
commit c03f47118b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 377 additions and 73 deletions

View File

@ -331,7 +331,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for data model name [{datamodel.name}]: {err}"
f"Error to yield dashboard lineage details for data model name [{str(datamodel)}]: {err}"
)
def get_db_service_names(self) -> List[str]:

View File

@ -23,6 +23,7 @@ from metadata.generated.schema.api.data.createDashboardDataModel import (
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
@ -57,7 +58,9 @@ from metadata.generated.schema.type.entityLineage import ColumnLineage
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, search_table_entities
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
@ -182,40 +185,62 @@ class TableauSource(DashboardServiceSource):
)
return None
def _create_datamodel_request(
self,
data_model: DataSource,
dashboard_details: TableauDashboard,
data_model_type: DataModelType = DataModelType.TableauDataModel,
) -> Iterable[Either[CreateDashboardDataModelRequest]]:
"""
Method to prepare the CreateDashboardDataModelRequest
"""
data_model_name = data_model.name if data_model.name else data_model.id
if filter_by_datamodel(
self.source_config.dataModelFilterPattern, data_model_name
):
self.status.filter(data_model_name, "Data model filtered out.")
return
try:
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(data_model.id),
displayName=data_model_name,
service=FullyQualifiedEntityName(self.context.get().dashboard_service),
dataModelType=data_model_type.value,
serviceType=DashboardServiceType.Tableau.value,
columns=self.get_column_info(data_model),
sql=self._get_datamodel_sql_query(data_model=data_model),
owner=self.get_owner_ref(dashboard_details=dashboard_details),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(
left=StackTraceError(
name=data_model_name,
error=f"Error yielding Data Model [{data_model_name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
def yield_datamodel(
self, dashboard_details: TableauDashboard
) -> Iterable[Either[CreateDashboardDataModelRequest]]:
"""
Method to ingest the Datasources(Published and Embedded) as DataModels from tableau
"""
if self.source_config.includeDataModels:
for data_model in dashboard_details.dataModels or []:
data_model_name = data_model.name if data_model.name else data_model.id
if filter_by_datamodel(
self.source_config.dataModelFilterPattern, data_model_name
):
self.status.filter(data_model_name, "Data model filtered out.")
continue
try:
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(data_model.id),
displayName=data_model_name,
service=FullyQualifiedEntityName(
self.context.get().dashboard_service
),
dataModelType=DataModelType.TableauDataModel.value,
serviceType=DashboardServiceType.Tableau.value,
columns=self.get_column_info(data_model),
sql=self._get_datamodel_sql_query(data_model=data_model),
owner=self.get_owner_ref(dashboard_details=dashboard_details),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
except Exception as exc:
yield Either(
left=StackTraceError(
name=data_model_name,
error=f"Error yielding Data Model [{data_model_name}]: {exc}",
stackTrace=traceback.format_exc(),
)
yield from self._create_datamodel_request(
data_model=data_model,
dashboard_details=dashboard_details,
data_model_type=DataModelType.TableauEmbeddedDatasource,
)
for upstream_data_model in data_model.upstreamDatasources or []:
yield from self._create_datamodel_request(
data_model=upstream_data_model,
dashboard_details=dashboard_details,
data_model_type=DataModelType.TableauPublishedDatasource,
)
def yield_dashboard(
@ -300,7 +325,7 @@ class TableauSource(DashboardServiceSource):
return child_column.fullyQualifiedName.root
return None
def _get_column_lineage(
def _get_column_lineage( # pylint: disable=arguments-differ
self,
upstream_table: UpstreamTable,
table_entity: Table,
@ -310,8 +335,8 @@ class TableauSource(DashboardServiceSource):
"""
Get the column lineage from the fields
"""
column_lineage = []
try:
column_lineage = []
for column in upstream_table.columns or []:
if column.id in upstream_col_set:
from_column = get_column_fqn(
@ -324,61 +349,256 @@ class TableauSource(DashboardServiceSource):
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())
return column_lineage or None
def yield_datamodel_dashboard_lineage(
self,
) -> Iterable[Either[AddLineageRequest]]:
"""
Returns:
Lineage request between Data Models and Dashboards
"""
if hasattr(self.context.get(), "dataModels") and self.context.get().dataModels:
for datamodel in self.context.get().dataModels:
try:
datamodel_fqn = fqn.build(
metadata=self.metadata,
entity_type=DashboardDataModel,
service_name=self.context.get().dashboard_service,
data_model_name=datamodel,
)
datamodel_entity = self.metadata.get_by_name(
entity=DashboardDataModel, fqn=datamodel_fqn
)
# TableauPublishedDatasource will be skipped here and their lineage will be processed later
if (
datamodel_entity.dataModelType
== DataModelType.TableauPublishedDatasource
):
continue
dashboard_fqn = fqn.build(
self.metadata,
entity_type=Dashboard,
service_name=self.context.get().dashboard_service,
dashboard_name=self.context.get().dashboard,
)
dashboard_entity = self.metadata.get_by_name(
entity=Dashboard, fqn=dashboard_fqn
)
yield self._get_add_lineage_request(
to_entity=dashboard_entity, from_entity=datamodel_entity
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for data model name [{str(datamodel)}]: {err}"
)
def _get_table_datamodel_lineage(
self,
upstream_data_model: DataSource,
datamodel: DataSource,
db_service_entity: DatabaseService,
upstream_data_model_entity: DashboardDataModel,
) -> Iterable[Either[AddLineageRequest]]:
"""
Method to create the lineage between table and datamodels in tableau
"""
try:
upstream_col_set = {
column.id
for field in upstream_data_model.fields
for column in field.upstreamColumns
}
for table in datamodel.upstreamTables or []:
om_tables = self._get_database_tables(db_service_entity, table)
for om_table in om_tables or []:
column_lineage = self._get_column_lineage(
table, om_table, upstream_data_model_entity, upstream_col_set
)
yield self._get_add_lineage_request(
to_entity=upstream_data_model_entity,
from_entity=om_table,
column_lineage=column_lineage,
)
except Exception as err:
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield table datamodel lineage details for data model "
f"name [{str(datamodel)}]: {err}"
),
stackTrace=traceback.format_exc(),
)
)
def _get_datamodel_child_col_lineage(
self,
data_model_col: Column,
upstream_data_model_col: Column,
) -> Optional[List[ColumnLineage]]:
"""
Get the lineage between children columns of the datamodels
"""
datamodel_child_column_lineage = []
try:
for datamodel_child_col in data_model_col.children or []:
for upstream_data_model_child_col in (
upstream_data_model_col.children or []
):
if (
datamodel_child_col.displayName
== upstream_data_model_child_col.displayName
):
from_child_column = (
upstream_data_model_child_col.fullyQualifiedName.root
)
to_child_column = datamodel_child_col.fullyQualifiedName.root
datamodel_child_column_lineage.append(
ColumnLineage(
fromColumns=[from_child_column],
toColumn=to_child_column,
)
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error to get datamodel child column lineage: {exc}")
return datamodel_child_column_lineage or None
def _get_datamodel_col_lineage(
self,
data_model_entity: DashboardDataModel,
upstream_data_model_entity: DashboardDataModel,
):
"""
Method to get the ColumnLineage list for the datamodels lineage
"""
datamodel_column_lineage = []
try:
for data_model_col in data_model_entity.columns or []:
for upstream_data_model_col in upstream_data_model_entity.columns or []:
if (
data_model_col.displayName
== upstream_data_model_col.displayName
):
from_column = upstream_data_model_col.fullyQualifiedName.root
to_column = data_model_col.fullyQualifiedName.root
datamodel_column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
datamodel_child_col_lineage = (
self._get_datamodel_child_col_lineage(
data_model_col=data_model_col,
upstream_data_model_col=upstream_data_model_col,
)
)
if datamodel_child_col_lineage:
datamodel_column_lineage.extend(datamodel_child_col_lineage)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error to get datamodel column lineage: {exc}")
return datamodel_column_lineage or None
def _get_datamodel_table_lineage(
self,
datamodel: DataSource,
data_model_entity: DashboardDataModel,
db_service_entity: DatabaseService,
) -> Iterable[Either[AddLineageRequest]]:
""" "
Method to create lineage between tables<->published datasource<->embedded datasource
"""
for upstream_data_model in datamodel.upstreamDatasources or []:
try:
upstream_data_model_entity = self._get_datamodel(
datamodel=upstream_data_model
)
if upstream_data_model_entity:
# Create [Published Datasource<->Embedded Datasource] lineage
yield self._get_add_lineage_request(
to_entity=data_model_entity,
from_entity=upstream_data_model_entity,
column_lineage=self._get_datamodel_col_lineage(
data_model_entity=data_model_entity,
upstream_data_model_entity=upstream_data_model_entity,
),
)
# Create [Table<->Published Datasource] lineage
yield from self._get_table_datamodel_lineage(
upstream_data_model=upstream_data_model,
datamodel=datamodel,
db_service_entity=db_service_entity,
upstream_data_model_entity=upstream_data_model_entity,
)
except Exception as err:
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield datamodel table lineage details for DB "
f"service name [{db_service_entity.name}]: {err}"
),
stackTrace=traceback.format_exc(),
)
)
def yield_dashboard_lineage_details(
self, dashboard_details: TableauDashboard, db_service_name: str
) -> Iterable[Either[AddLineageRequest]]:
"""
In Tableau, we get the lineage between data models and data sources.
We build a DatabaseTable set from the sheets (data models) columns, and create a lineage request with an OM
table if we can find it.
This method creates the lineage between tables and datamodels
Args:
dashboard_details: Tableau Dashboard
db_service_name: database service where look up for lineage
Returns:
Lineage request between Data Models and Database table
Lineage request between Data Models and Database tables
"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
for datamodel in dashboard_details.dataModels or []:
try:
data_model_entity = self._get_datamodel(datamodel=datamodel)
upstream_col_set = {
column.id
for field in datamodel.fields
for column in field.upstreamColumns
}
if data_model_entity:
for table in datamodel.upstreamTables or []:
om_table = self._get_database_table(db_service_entity, table)
if om_table:
column_lineage = self._get_column_lineage(
table, om_table, data_model_entity, upstream_col_set
if db_service_entity:
for datamodel in dashboard_details.dataModels or []:
try:
data_model_entity = self._get_datamodel(datamodel=datamodel)
if data_model_entity:
if datamodel.upstreamDatasources:
# if we have upstreamDatasources(Published Datasources), create lineage in below format
# Table<->Published Datasource<->Embedded Datasource
yield from self._get_datamodel_table_lineage(
datamodel=datamodel,
data_model_entity=data_model_entity,
db_service_entity=db_service_entity,
)
yield self._get_add_lineage_request(
to_entity=data_model_entity,
from_entity=om_table,
column_lineage=column_lineage,
else:
# else we'll create lineage only using Embedded Datasources in below format
# Table<->Embedded Datasource
yield from self._get_table_datamodel_lineage(
upstream_data_model=datamodel,
datamodel=datamodel,
db_service_entity=db_service_entity,
upstream_data_model_entity=data_model_entity,
)
except Exception as err:
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield dashboard lineage details for DB "
f"service name [{db_service_name}]: {err}"
),
stackTrace=traceback.format_exc(),
except Exception as err:
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield dashboard lineage details for DB "
f"service name [{db_service_name}]: {err}"
),
stackTrace=traceback.format_exc(),
)
)
)
def yield_dashboard_chart(
self, dashboard_details: TableauDashboard
@ -439,14 +659,13 @@ class TableauSource(DashboardServiceSource):
except ConnectionError as err:
logger.debug(f"Error closing connection - {err}")
def _get_database_table(
def _get_table_entities_from_api(
self, db_service_entity: DatabaseService, table: UpstreamTable
) -> Optional[Table]:
) -> Optional[List[Table]]:
"""
Get the table entity for lineage
In case we get the table details from the Graphql APIs we process them
"""
# table.name in tableau can come as db.schema.table_name. Hence the logic to split it
if table.name:
try:
database_schema_table = fqn.split_table_name(table.name)
database_name = (
table.database.name
@ -473,10 +692,78 @@ class TableauSource(DashboardServiceSource):
database_name=database_name,
)
if table_fqn:
return self.metadata.get_by_name(
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
if table_entity:
return [table_entity]
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error to get tables for lineage using GraphQL Apis: {exc}")
return None
def _get_table_entities_from_query(
self, db_service_entity: DatabaseService, table: UpstreamTable
) -> Optional[List[Table]]:
"""
In case we get the table details from the Graphql APIs we process them
"""
tables_list = []
try:
for custom_sql_table in table.referencedByQueries or []:
lineage_parser = LineageParser(
custom_sql_table.query,
ConnectionTypeDialectMapper.dialect_of(
db_service_entity.serviceType.value
)
if db_service_entity
else None,
)
for source_table in lineage_parser.source_tables or []:
database_schema_table = fqn.split_table_name(str(source_table))
database_name = database_schema_table.get("database")
if isinstance(
db_service_entity.connection.config, BigQueryConnection
):
database_name = None
database_name = get_database_name_for_lineage(
db_service_entity, database_name
)
schema_name = self.check_database_schema_name(
database_schema_table.get("database_schema")
)
table_name = database_schema_table.get("table")
from_entities = search_table_entities(
metadata=self.metadata,
database=database_name,
service_name=db_service_entity.fullyQualifiedName.root,
database_schema=schema_name,
table=table_name,
)
tables_list.extend(from_entities)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error to get tables for lineage using SQL Queries: {exc}")
return tables_list or []
def _get_database_tables(
self, db_service_entity: DatabaseService, table: UpstreamTable
) -> Optional[List[Table]]:
"""
Get the table entities for lineage
"""
# If we get the table details from the Graphql APIs we process them directly
if table.name:
return self._get_table_entities_from_api(
db_service_entity=db_service_entity, table=table
)
# Else we get the table details from the SQL queries and process them using SQL lineage parser
if table.referencedByQueries:
return self._get_table_entities_from_query(
db_service_entity=db_service_entity, table=table
)
return None
def _get_datamodel(self, datamodel: DataSource) -> Optional[DashboardDataModel]:

View File

@ -123,6 +123,7 @@ class DataSource(BaseModel):
name: Optional[str] = None
fields: Optional[List[DatasourceField]] = None
upstreamTables: Optional[List[UpstreamTable]] = None
upstreamDatasources: Optional[List["DataSource"]] = None
class TableauDatasources(BaseModel):

View File

@ -23,6 +23,20 @@ workbooks(filter:{{luid: "{workbook_id}"}}){{
nodes {{
id
name
upstreamDatasources{{
id
name
fields {{
id
name
upstreamColumns{{
id
name
remoteType
}}
description
}}
}}
fields {{
id
name

View File

@ -15,6 +15,8 @@
"$comment": "Data Model types supported.",
"enum": [
"TableauDataModel",
"TableauPublishedDatasource",
"TableauEmbeddedDatasource",
"SupersetDataModel",
"MetabaseDataModel",
"LookMlView",