diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 553b3d78c18..df8fbc87a79 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -331,7 +331,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): except Exception as err: logger.debug(traceback.format_exc()) logger.error( - f"Error to yield dashboard lineage details for data model name [{datamodel.name}]: {err}" + f"Error to yield dashboard lineage details for data model name [{str(datamodel)}]: {err}" ) def get_db_service_names(self) -> List[str]: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index 85d03e14b1c..838eda51d19 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -23,6 +23,7 @@ from metadata.generated.schema.api.data.createDashboardDataModel import ( ) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboardDataModel import ( DashboardDataModel, DataModelType, @@ -57,7 +58,9 @@ from metadata.generated.schema.type.entityLineage import ColumnLineage from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.sql_lineage import get_column_fqn +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import get_column_fqn, search_table_entities from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource @@ -182,40 +185,62 @@ class TableauSource(DashboardServiceSource): ) return None + def _create_datamodel_request( + self, + data_model: DataSource, + dashboard_details: TableauDashboard, + data_model_type: DataModelType = DataModelType.TableauDataModel, + ) -> Iterable[Either[CreateDashboardDataModelRequest]]: + """ + Method to prepare the CreateDashboardDataModelRequest + """ + data_model_name = data_model.name if data_model.name else data_model.id + if filter_by_datamodel( + self.source_config.dataModelFilterPattern, data_model_name + ): + self.status.filter(data_model_name, "Data model filtered out.") + return + try: + data_model_request = CreateDashboardDataModelRequest( + name=EntityName(data_model.id), + displayName=data_model_name, + service=FullyQualifiedEntityName(self.context.get().dashboard_service), + dataModelType=data_model_type.value, + serviceType=DashboardServiceType.Tableau.value, + columns=self.get_column_info(data_model), + sql=self._get_datamodel_sql_query(data_model=data_model), + owner=self.get_owner_ref(dashboard_details=dashboard_details), + ) + yield Either(right=data_model_request) + self.register_record_datamodel(datamodel_request=data_model_request) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=data_model_name, + error=f"Error yielding Data Model [{data_model_name}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + def yield_datamodel( self, dashboard_details: TableauDashboard ) -> Iterable[Either[CreateDashboardDataModelRequest]]: + """ + Method to ingest the Datasources(Published and Embedded) as DataModels from tableau + """ if self.source_config.includeDataModels: for data_model in dashboard_details.dataModels or []: - data_model_name = data_model.name if data_model.name else data_model.id - if filter_by_datamodel( - self.source_config.dataModelFilterPattern, data_model_name - ): - self.status.filter(data_model_name, "Data model filtered out.") - continue - try: - data_model_request = CreateDashboardDataModelRequest( - name=EntityName(data_model.id), - displayName=data_model_name, - service=FullyQualifiedEntityName( - self.context.get().dashboard_service - ), - dataModelType=DataModelType.TableauDataModel.value, - serviceType=DashboardServiceType.Tableau.value, - columns=self.get_column_info(data_model), - sql=self._get_datamodel_sql_query(data_model=data_model), - owner=self.get_owner_ref(dashboard_details=dashboard_details), - ) - yield Either(right=data_model_request) - self.register_record_datamodel(datamodel_request=data_model_request) - - except Exception as exc: - yield Either( - left=StackTraceError( - name=data_model_name, - error=f"Error yielding Data Model [{data_model_name}]: {exc}", - stackTrace=traceback.format_exc(), - ) + yield from self._create_datamodel_request( + data_model=data_model, + dashboard_details=dashboard_details, + data_model_type=DataModelType.TableauEmbeddedDatasource, + ) + for upstream_data_model in data_model.upstreamDatasources or []: + yield from self._create_datamodel_request( + data_model=upstream_data_model, + dashboard_details=dashboard_details, + data_model_type=DataModelType.TableauPublishedDatasource, ) def yield_dashboard( @@ -300,7 +325,7 @@ class TableauSource(DashboardServiceSource): return child_column.fullyQualifiedName.root return None - def _get_column_lineage( + def _get_column_lineage( # pylint: disable=arguments-differ self, upstream_table: UpstreamTable, table_entity: Table, @@ -310,8 +335,8 @@ class TableauSource(DashboardServiceSource): """ Get the column lineage from the fields """ + column_lineage = [] try: - column_lineage = [] for column in upstream_table.columns or []: if column.id in upstream_col_set: from_column = get_column_fqn( @@ -324,61 +349,256 @@ class TableauSource(DashboardServiceSource): column_lineage.append( ColumnLineage(fromColumns=[from_column], toColumn=to_column) ) - return column_lineage except Exception as exc: logger.debug(f"Error to get column lineage: {exc}") logger.debug(traceback.format_exc()) + return column_lineage or None + + def yield_datamodel_dashboard_lineage( + self, + ) -> Iterable[Either[AddLineageRequest]]: + """ + Returns: + Lineage request between Data Models and Dashboards + """ + if hasattr(self.context.get(), "dataModels") and self.context.get().dataModels: + for datamodel in self.context.get().dataModels: + try: + datamodel_fqn = fqn.build( + metadata=self.metadata, + entity_type=DashboardDataModel, + service_name=self.context.get().dashboard_service, + data_model_name=datamodel, + ) + datamodel_entity = self.metadata.get_by_name( + entity=DashboardDataModel, fqn=datamodel_fqn + ) + + # TableauPublishedDatasource will be skipped here and their lineage will be processed later + if ( + datamodel_entity.dataModelType + == DataModelType.TableauPublishedDatasource + ): + continue + + dashboard_fqn = fqn.build( + self.metadata, + entity_type=Dashboard, + service_name=self.context.get().dashboard_service, + dashboard_name=self.context.get().dashboard, + ) + dashboard_entity = self.metadata.get_by_name( + entity=Dashboard, fqn=dashboard_fqn + ) + yield self._get_add_lineage_request( + to_entity=dashboard_entity, from_entity=datamodel_entity + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error( + f"Error to yield dashboard lineage details for data model name [{str(datamodel)}]: {err}" + ) + + def _get_table_datamodel_lineage( + self, + upstream_data_model: DataSource, + datamodel: DataSource, + db_service_entity: DatabaseService, + upstream_data_model_entity: DashboardDataModel, + ) -> Iterable[Either[AddLineageRequest]]: + """ + Method to create the lineage between table and datamodels in tableau + """ + try: + upstream_col_set = { + column.id + for field in upstream_data_model.fields + for column in field.upstreamColumns + } + for table in datamodel.upstreamTables or []: + om_tables = self._get_database_tables(db_service_entity, table) + for om_table in om_tables or []: + column_lineage = self._get_column_lineage( + table, om_table, upstream_data_model_entity, upstream_col_set + ) + yield self._get_add_lineage_request( + to_entity=upstream_data_model_entity, + from_entity=om_table, + column_lineage=column_lineage, + ) + except Exception as err: + yield Either( + left=StackTraceError( + name="Lineage", + error=( + "Error to yield table datamodel lineage details for data model " + f"name [{str(datamodel)}]: {err}" + ), + stackTrace=traceback.format_exc(), + ) + ) + + def _get_datamodel_child_col_lineage( + self, + data_model_col: Column, + upstream_data_model_col: Column, + ) -> Optional[List[ColumnLineage]]: + """ + Get the lineage between children columns of the datamodels + """ + datamodel_child_column_lineage = [] + try: + for datamodel_child_col in data_model_col.children or []: + for upstream_data_model_child_col in ( + upstream_data_model_col.children or [] + ): + if ( + datamodel_child_col.displayName + == upstream_data_model_child_col.displayName + ): + from_child_column = ( + upstream_data_model_child_col.fullyQualifiedName.root + ) + to_child_column = datamodel_child_col.fullyQualifiedName.root + datamodel_child_column_lineage.append( + ColumnLineage( + fromColumns=[from_child_column], + toColumn=to_child_column, + ) + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error to get datamodel child column lineage: {exc}") + return datamodel_child_column_lineage or None + + def _get_datamodel_col_lineage( + self, + data_model_entity: DashboardDataModel, + upstream_data_model_entity: DashboardDataModel, + ): + """ + Method to get the ColumnLineage list for the datamodels lineage + """ + datamodel_column_lineage = [] + try: + for data_model_col in data_model_entity.columns or []: + for upstream_data_model_col in upstream_data_model_entity.columns or []: + if ( + data_model_col.displayName + == upstream_data_model_col.displayName + ): + from_column = upstream_data_model_col.fullyQualifiedName.root + to_column = data_model_col.fullyQualifiedName.root + datamodel_column_lineage.append( + ColumnLineage(fromColumns=[from_column], toColumn=to_column) + ) + datamodel_child_col_lineage = ( + self._get_datamodel_child_col_lineage( + data_model_col=data_model_col, + upstream_data_model_col=upstream_data_model_col, + ) + ) + if datamodel_child_col_lineage: + datamodel_column_lineage.extend(datamodel_child_col_lineage) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error to get datamodel column lineage: {exc}") + + return datamodel_column_lineage or None + + def _get_datamodel_table_lineage( + self, + datamodel: DataSource, + data_model_entity: DashboardDataModel, + db_service_entity: DatabaseService, + ) -> Iterable[Either[AddLineageRequest]]: + """ " + Method to create lineage between tables<->published datasource<->embedded datasource + """ + for upstream_data_model in datamodel.upstreamDatasources or []: + try: + upstream_data_model_entity = self._get_datamodel( + datamodel=upstream_data_model + ) + if upstream_data_model_entity: + # Create [Published Datasource<->Embedded Datasource] lineage + yield self._get_add_lineage_request( + to_entity=data_model_entity, + from_entity=upstream_data_model_entity, + column_lineage=self._get_datamodel_col_lineage( + data_model_entity=data_model_entity, + upstream_data_model_entity=upstream_data_model_entity, + ), + ) + # Create [Table<->Published Datasource] lineage + yield from self._get_table_datamodel_lineage( + upstream_data_model=upstream_data_model, + datamodel=datamodel, + db_service_entity=db_service_entity, + upstream_data_model_entity=upstream_data_model_entity, + ) + except Exception as err: + yield Either( + left=StackTraceError( + name="Lineage", + error=( + "Error to yield datamodel table lineage details for DB " + f"service name [{db_service_entity.name}]: {err}" + ), + stackTrace=traceback.format_exc(), + ) + ) def yield_dashboard_lineage_details( self, dashboard_details: TableauDashboard, db_service_name: str ) -> Iterable[Either[AddLineageRequest]]: """ - In Tableau, we get the lineage between data models and data sources. - - We build a DatabaseTable set from the sheets (data models) columns, and create a lineage request with an OM - table if we can find it. + This method creates the lineage between tables and datamodels Args: dashboard_details: Tableau Dashboard db_service_name: database service where look up for lineage Returns: - Lineage request between Data Models and Database table + Lineage request between Data Models and Database tables """ db_service_entity = self.metadata.get_by_name( entity=DatabaseService, fqn=db_service_name ) - for datamodel in dashboard_details.dataModels or []: - try: - data_model_entity = self._get_datamodel(datamodel=datamodel) - upstream_col_set = { - column.id - for field in datamodel.fields - for column in field.upstreamColumns - } - if data_model_entity: - for table in datamodel.upstreamTables or []: - om_table = self._get_database_table(db_service_entity, table) - if om_table: - column_lineage = self._get_column_lineage( - table, om_table, data_model_entity, upstream_col_set + if db_service_entity: + for datamodel in dashboard_details.dataModels or []: + try: + data_model_entity = self._get_datamodel(datamodel=datamodel) + if data_model_entity: + if datamodel.upstreamDatasources: + # if we have upstreamDatasources(Published Datasources), create lineage in below format + # Table<->Published Datasource<->Embedded Datasource + yield from self._get_datamodel_table_lineage( + datamodel=datamodel, + data_model_entity=data_model_entity, + db_service_entity=db_service_entity, ) - yield self._get_add_lineage_request( - to_entity=data_model_entity, - from_entity=om_table, - column_lineage=column_lineage, + else: + # else we'll create lineage only using Embedded Datasources in below format + # Table<->Embedded Datasource + yield from self._get_table_datamodel_lineage( + upstream_data_model=datamodel, + datamodel=datamodel, + db_service_entity=db_service_entity, + upstream_data_model_entity=data_model_entity, ) - except Exception as err: - yield Either( - left=StackTraceError( - name="Lineage", - error=( - "Error to yield dashboard lineage details for DB " - f"service name [{db_service_name}]: {err}" - ), - stackTrace=traceback.format_exc(), + except Exception as err: + yield Either( + left=StackTraceError( + name="Lineage", + error=( + "Error to yield dashboard lineage details for DB " + f"service name [{db_service_name}]: {err}" + ), + stackTrace=traceback.format_exc(), + ) ) - ) def yield_dashboard_chart( self, dashboard_details: TableauDashboard @@ -439,14 +659,13 @@ class TableauSource(DashboardServiceSource): except ConnectionError as err: logger.debug(f"Error closing connection - {err}") - def _get_database_table( + def _get_table_entities_from_api( self, db_service_entity: DatabaseService, table: UpstreamTable - ) -> Optional[Table]: + ) -> Optional[List[Table]]: """ - Get the table entity for lineage + In case we get the table details from the Graphql APIs we process them """ - # table.name in tableau can come as db.schema.table_name. Hence the logic to split it - if table.name: + try: database_schema_table = fqn.split_table_name(table.name) database_name = ( table.database.name @@ -473,10 +692,78 @@ class TableauSource(DashboardServiceSource): database_name=database_name, ) if table_fqn: - return self.metadata.get_by_name( + table_entity = self.metadata.get_by_name( entity=Table, fqn=table_fqn, ) + if table_entity: + return [table_entity] + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error to get tables for lineage using GraphQL Apis: {exc}") + return None + + def _get_table_entities_from_query( + self, db_service_entity: DatabaseService, table: UpstreamTable + ) -> Optional[List[Table]]: + """ + In case we get the table details from the Graphql APIs we process them + """ + tables_list = [] + try: + for custom_sql_table in table.referencedByQueries or []: + lineage_parser = LineageParser( + custom_sql_table.query, + ConnectionTypeDialectMapper.dialect_of( + db_service_entity.serviceType.value + ) + if db_service_entity + else None, + ) + for source_table in lineage_parser.source_tables or []: + database_schema_table = fqn.split_table_name(str(source_table)) + database_name = database_schema_table.get("database") + if isinstance( + db_service_entity.connection.config, BigQueryConnection + ): + database_name = None + database_name = get_database_name_for_lineage( + db_service_entity, database_name + ) + schema_name = self.check_database_schema_name( + database_schema_table.get("database_schema") + ) + table_name = database_schema_table.get("table") + from_entities = search_table_entities( + metadata=self.metadata, + database=database_name, + service_name=db_service_entity.fullyQualifiedName.root, + database_schema=schema_name, + table=table_name, + ) + tables_list.extend(from_entities) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error to get tables for lineage using SQL Queries: {exc}") + return tables_list or [] + + def _get_database_tables( + self, db_service_entity: DatabaseService, table: UpstreamTable + ) -> Optional[List[Table]]: + """ + Get the table entities for lineage + """ + # If we get the table details from the Graphql APIs we process them directly + if table.name: + return self._get_table_entities_from_api( + db_service_entity=db_service_entity, table=table + ) + # Else we get the table details from the SQL queries and process them using SQL lineage parser + if table.referencedByQueries: + return self._get_table_entities_from_query( + db_service_entity=db_service_entity, table=table + ) return None def _get_datamodel(self, datamodel: DataSource) -> Optional[DashboardDataModel]: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py index 81d99ca9f1a..083f461bd73 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/models.py @@ -123,6 +123,7 @@ class DataSource(BaseModel): name: Optional[str] = None fields: Optional[List[DatasourceField]] = None upstreamTables: Optional[List[UpstreamTable]] = None + upstreamDatasources: Optional[List["DataSource"]] = None class TableauDatasources(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py index 0f18924b20b..dab4c4c9c64 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/queries.py @@ -23,6 +23,20 @@ workbooks(filter:{{luid: "{workbook_id}"}}){{ nodes {{ id name + upstreamDatasources{{ + id + name + fields {{ + id + name + upstreamColumns{{ + id + name + remoteType + }} + description + }} + }} fields {{ id name diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json index 390e429ad5d..66f97ae5d20 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json @@ -15,6 +15,8 @@ "$comment": "Data Model types supported.", "enum": [ "TableauDataModel", + "TableauPublishedDatasource", + "TableauEmbeddedDatasource", "SupersetDataModel", "MetabaseDataModel", "LookMlView",