From 40a9c678759c17a13c48cfc7cc4ff6a198acd770 Mon Sep 17 00:00:00 2001 From: harshsoni2024 <64592571+harshsoni2024@users.noreply.github.com> Date: Fri, 7 Mar 2025 11:16:58 +0530 Subject: [PATCH] Day 1 - Dashboard service lineage without db_service_name (#19911) --- .../ingestion/ometa/mixins/es_mixin.py | 39 +++- .../source/dashboard/dashboard_service.py | 6 +- .../dashboard/domodashboard/metadata.py | 4 +- .../source/dashboard/lightdash/metadata.py | 2 +- .../source/dashboard/looker/metadata.py | 4 +- .../source/dashboard/metabase/metadata.py | 64 ++++--- .../dashboard/microstrategy/metadata.py | 4 +- .../source/dashboard/mode/metadata.py | 25 ++- .../source/dashboard/powerbi/metadata.py | 27 +-- .../source/dashboard/qlikcloud/metadata.py | 17 +- .../source/dashboard/qliksense/metadata.py | 44 +++-- .../source/dashboard/quicksight/metadata.py | 59 +++--- .../source/dashboard/redash/metadata.py | 24 +-- .../source/dashboard/sigma/metadata.py | 26 ++- .../source/dashboard/superset/api_source.py | 51 +++--- .../source/dashboard/superset/db_source.py | 27 +-- .../source/dashboard/superset/mixin.py | 84 +++++---- .../source/dashboard/tableau/metadata.py | 168 ++++++++++-------- .../unit/topology/dashboard/test_metabase.py | 5 +- .../unit/topology/dashboard/test_superset.py | 31 ++-- 20 files changed, 404 insertions(+), 307 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index fbd84b4e92e..bddcafd38f0 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -16,7 +16,17 @@ To be used by OpenMetadata class import functools import json import traceback -from typing import Generic, Iterable, Iterator, List, Optional, Set, Type, TypeVar +from typing import ( + Generic, + Iterable, + Iterator, + List, + Optional, + Set, + Type, + TypeVar, + Union, +) from urllib.parse import quote_plus from pydantic import Field @@ -29,7 +39,7 @@ from metadata.ingestion.models.custom_pydantic import BaseModel from metadata.ingestion.ometa.client import REST, APIError from metadata.ingestion.ometa.utils import quote from metadata.ingestion.source.models import TableView -from metadata.utils.elasticsearch import ES_INDEX_MAP +from metadata.utils.elasticsearch import ES_INDEX_MAP, get_entity_from_es_result from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -469,3 +479,28 @@ class ESMixin(Generic[T]): schema_name=schema_name, table_name=table_name, ) + + def search_in_any_service( + self, + entity_type: Type[T], + fqn_search_string: str, + fetch_multiple_entities: bool = False, + ) -> Optional[Union[List[Table], Table]]: + """ + fetch table from es when with/without `db_service_name` + """ + try: + entity_result = get_entity_from_es_result( + entity_list=self.es_search_from_fqn( + entity_type=entity_type, + fqn_search_string=fqn_search_string, + ), + fetch_multiple_entities=fetch_multiple_entities, + ) + return entity_result + except Exception as exc: + logger.debug( + f"Error to fetch entity: fqn={fqn_search_string} from es: {exc}" + ) + logger.debug(traceback.format_exc()) + return None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index e98881988fa..fcca4a2dc16 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -252,7 +252,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): @abstractmethod def yield_dashboard_lineage_details( - self, dashboard_details: Any, db_service_name: str + self, + dashboard_details: Any, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between dashboard and data sources @@ -372,6 +374,8 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): yield lineage db_service_names = self.get_db_service_names() + if not db_service_names: + yield from self.yield_dashboard_lineage_details(dashboard_details) or [] for db_service_name in db_service_names or []: yield from self.yield_dashboard_lineage_details( dashboard_details, db_service_name diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py index 2b8322b0dce..a21cf1c6e4b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py @@ -248,6 +248,8 @@ class DomodashboardSource(DashboardServiceSource): ) def yield_dashboard_lineage_details( - self, dashboard_details: dict, db_service_name + self, + dashboard_details: dict, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """No lineage implemented""" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py index ad011368e6d..163d60f637a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py @@ -176,7 +176,7 @@ class LightdashSource(DashboardServiceSource): def yield_dashboard_lineage_details( self, dashboard_details: LightdashDashboard, - db_service_name: Optional[str], + db_service_name: Optional[str] = None, ) -> Optional[Iterable[AddLineageRequest]]: """Get lineage method diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index 4d736c1f895..c6c5ecb1298 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -924,7 +924,9 @@ class LookerSource(DashboardServiceSource): ) def yield_dashboard_lineage_details( - self, dashboard_details: LookerDashboard, _: str + self, + dashboard_details: LookerDashboard, + _: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between charts and data sources. diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py index fc73ce5f7ce..0eaca47977c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py @@ -20,6 +20,7 @@ from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import ( Dashboard as LineageDashboard, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import ( MetabaseConnection, ) @@ -42,9 +43,8 @@ from metadata.generated.schema.type.basic import ( from metadata.generated.schema.type.entityReferenceList import EntityReferenceList from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser -from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.ingestion.source.dashboard.metabase.models import ( @@ -56,6 +56,7 @@ from metadata.ingestion.source.dashboard.metabase.models import ( from metadata.utils import fqn from metadata.utils.constants import DEFAULT_DASHBAORD from metadata.utils.filters import filter_by_chart +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import ( clean_uri, get_standard_chart_type, @@ -285,15 +286,13 @@ class MetabaseSource(DashboardServiceSource): def yield_dashboard_lineage_details( self, dashboard_details: MetabaseDashboardDetails, - db_service_name: Optional[str], + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage method Args: dashboard_details """ - if not db_service_name: - return chart_ids, dashboard_name = ( dashboard_details.card_ids, str(dashboard_details.id), @@ -333,11 +332,17 @@ class MetabaseSource(DashboardServiceSource): ) ) - def _get_database_service(self, db_service_name: str): + def _get_database_service(self, db_service_name: Optional[str]): + if not db_service_name: + return None return self.metadata.get_by_name(DatabaseService, db_service_name) + # pylint: disable=too-many-locals def _yield_lineage_from_query( - self, chart_details: MetabaseChart, db_service_name: str, dashboard_name: str + self, + chart_details: MetabaseChart, + db_service_name: Optional[str], + dashboard_name: str, ) -> Iterable[Either[AddLineageRequest]]: database = self.client.get_database(chart_details.database_id) @@ -360,20 +365,23 @@ class MetabaseSource(DashboardServiceSource): query, ConnectionTypeDialectMapper.dialect_of(db_service.serviceType.value) if db_service - else None, + else Dialect.ANSI, ) for table in lineage_parser.source_tables: database_schema_name, table = fqn.split(str(table))[-2:] database_schema_name = self.check_database_schema_name(database_schema_name) - from_entities = search_table_entities( - metadata=self.metadata, - database=database_name, - service_name=db_service_name, - database_schema=database_schema_name, - table=table, + fqn_search_string = build_es_fqn_search_string( + database_name=database_name, + schema_name=database_schema_name, + service_name=db_service_name or "*", + table_name=table, + ) + from_entities = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + fetch_multiple_entities=True, ) - to_fqn = fqn.build( self.metadata, entity_type=LineageDashboard, @@ -385,13 +393,16 @@ class MetabaseSource(DashboardServiceSource): fqn=to_fqn, ) - for from_entity in from_entities: + for from_entity in from_entities or []: yield self._get_add_lineage_request( to_entity=to_entity, from_entity=from_entity ) def _yield_lineage_from_api( - self, chart_details: MetabaseChart, db_service_name: str, dashboard_name: str + self, + chart_details: MetabaseChart, + db_service_name: Optional[str], + dashboard_name: str, ) -> Iterable[Either[AddLineageRequest]]: table = self.client.get_table(chart_details.table_id) table_name = table.name or table.display_name @@ -400,14 +411,17 @@ class MetabaseSource(DashboardServiceSource): return database_name = table.db.details.db if table.db and table.db.details else None - from_entities = search_table_entities( - metadata=self.metadata, - database=database_name, - service_name=db_service_name, - database_schema=table.table_schema, - table=table_name, + fqn_search_string = build_es_fqn_search_string( + database_name=database_name, + schema_name=table.table_schema, + service_name=db_service_name or "*", + table_name=table_name, + ) + from_entities = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + fetch_multiple_entities=True, ) - to_fqn = fqn.build( self.metadata, entity_type=LineageDashboard, @@ -420,7 +434,7 @@ class MetabaseSource(DashboardServiceSource): fqn=to_fqn, ) - for from_entity in from_entities: + for from_entity in from_entities or []: yield self._get_add_lineage_request( to_entity=to_entity, from_entity=from_entity ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/microstrategy/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/microstrategy/metadata.py index 1fcd23c1cca..8e871a8a078 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/microstrategy/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/microstrategy/metadata.py @@ -159,7 +159,9 @@ class MicrostrategySource(DashboardServiceSource): ) def yield_dashboard_lineage_details( - self, dashboard_details: MstrDashboardDetails, db_service_name: str + self, + dashboard_details: MstrDashboardDetails, + db_service_name: Optional[str] = None, ) -> Optional[Iterable[AddLineageRequest]]: """Not Implemented""" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py index d02329a0220..e793bf465fe 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py @@ -20,6 +20,7 @@ from metadata.generated.schema.entity.data.chart import Chart, ChartType from metadata.generated.schema.entity.data.dashboard import ( Dashboard as Lineage_Dashboard, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.dashboard.modeConnection import ( ModeConnection, ) @@ -38,12 +39,12 @@ from metadata.generated.schema.type.basic import ( from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.parser import LineageParser -from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.ingestion.source.dashboard.mode import client from metadata.utils import fqn from metadata.utils.filters import filter_by_chart +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -130,7 +131,9 @@ class ModeSource(DashboardServiceSource): self.register_record(dashboard_request=dashboard_request) def yield_dashboard_lineage_details( - self, dashboard_details: dict, db_service_name: str + self, + dashboard_details: dict, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage method""" try: @@ -151,14 +154,18 @@ class ModeSource(DashboardServiceSource): database_schema_name = self.check_database_schema_name( database_schema_name ) - from_entities = search_table_entities( - metadata=self.metadata, - database=data_source.get(client.DATABASE), - service_name=db_service_name, - database_schema=database_schema_name, - table=table, + fqn_search_string = build_es_fqn_search_string( + database_name=data_source.get(client.DATABASE), + schema_name=database_schema_name, + service_name=db_service_name or "*", + table_name=table, ) - for from_entity in from_entities: + from_entities = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + fetch_multiple_entities=True, + ) + for from_entity in from_entities or []: to_entity = self.metadata.get_by_name( entity=Lineage_Dashboard, fqn=fqn.build( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 5eecaa50fc0..98ee638101f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -67,6 +67,7 @@ from metadata.utils.filters import ( filter_by_datamodel, filter_by_project, ) +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -534,7 +535,9 @@ class PowerbiSource(DashboardServiceSource): ) def create_datamodel_report_lineage( - self, db_service_name: str, dashboard_details: PowerBIReport + self, + db_service_name: Optional[str], + dashboard_details: PowerBIReport, ) -> Iterable[Either[CreateDashboardRequest]]: """ create the lineage between datamodel and report @@ -615,7 +618,7 @@ class PowerbiSource(DashboardServiceSource): def _get_table_and_datamodel_lineage( self, - db_service_name: str, + db_service_name: Optional[str], table: PowerBiTable, datamodel_entity: DashboardDataModel, ) -> Optional[Either[AddLineageRequest]]: @@ -623,19 +626,16 @@ class PowerbiSource(DashboardServiceSource): Method to create lineage between table and datamodels """ try: - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, + fqn_search_string = build_es_fqn_search_string( database_name=None, schema_name=None, + service_name=db_service_name or "*", table_name=table.name, ) - table_entity = self.metadata.get_by_name( - entity=Table, - fqn=table_fqn, + table_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, ) - if table_entity and datamodel_entity: columns_list = [column.name for column in table.columns] column_lineage = self._get_column_lineage( @@ -661,7 +661,7 @@ class PowerbiSource(DashboardServiceSource): def create_table_datamodel_lineage_from_files( self, - db_service_name: str, + db_service_name: Optional[str], datamodel_entity: Optional[DashboardDataModel], ) -> Iterable[Either[AddLineageRequest]]: """ @@ -706,7 +706,7 @@ class PowerbiSource(DashboardServiceSource): def yield_dashboard_lineage_details( self, dashboard_details: Union[PowerBIDashboard, PowerBIReport], - db_service_name: str, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ We will build the logic to build the logic as below @@ -715,7 +715,8 @@ class PowerbiSource(DashboardServiceSource): try: if isinstance(dashboard_details, PowerBIReport): yield from self.create_datamodel_report_lineage( - db_service_name=db_service_name, dashboard_details=dashboard_details + db_service_name=db_service_name, + dashboard_details=dashboard_details, ) if isinstance(dashboard_details, PowerBIDashboard): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py index 9d042091641..5dd60d1efeb 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py @@ -47,6 +47,7 @@ from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSour from metadata.ingestion.source.dashboard.qliksense.models import QlikTable from metadata.utils import fqn from metadata.utils.filters import filter_by_chart +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -188,18 +189,22 @@ class QlikcloudSource(QliksenseSource): def yield_dashboard_lineage_details( self, dashboard_details: QlikApp, - db_service_name: Optional[str], + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage method""" - db_service_entity = self.metadata.get_by_name( - entity=DatabaseService, fqn=db_service_name - ) for datamodel in self.data_models or []: try: data_model_entity = self._get_datamodel(datamodel_id=datamodel.id) if data_model_entity: - om_table = self._get_database_table( - db_service_entity, data_model_entity + fqn_search_string = build_es_fqn_search_string( + database_name=None, + schema_name=None, + service_name=db_service_name or "*", + table_name=data_model_entity.displayName, + ) + om_table = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, ) if om_table: columns_list = [col.name for col in datamodel.fields] diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py index fdce2d7d5d3..ca85ee8db4f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py @@ -55,6 +55,7 @@ from metadata.ingestion.source.dashboard.qliksense.models import ( ) from metadata.utils import fqn from metadata.utils.filters import filter_by_chart, filter_by_datamodel +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger @@ -276,7 +277,11 @@ class QliksenseSource(DashboardServiceSource): return None def _get_database_table( - self, db_service_entity: DatabaseService, datamodel: QlikTable + self, + db_service_entity: DatabaseService, + datamodel: QlikTable, + schema_name: Optional[str], + database_name: Optional[str], ) -> Optional[Table]: """ Get the table entity for lineage @@ -284,17 +289,6 @@ class QliksenseSource(DashboardServiceSource): # table.name in tableau can come as db.schema.table_name. Hence the logic to split it if datamodel.tableName and db_service_entity: try: - if len(datamodel.connectorProperties.tableQualifiers) > 1: - ( - database_name, - schema_name, - ) = datamodel.connectorProperties.tableQualifiers[-2:] - elif len(datamodel.connectorProperties.tableQualifiers) == 1: - schema_name = datamodel.connectorProperties.tableQualifiers[-1] - database_name = None - else: - schema_name, database_name = None, None - table_fqn = fqn.build( self.metadata, entity_type=Table, @@ -316,18 +310,32 @@ class QliksenseSource(DashboardServiceSource): def yield_dashboard_lineage_details( self, dashboard_details: QlikDashboard, - db_service_name: Optional[str], + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage method""" - db_service_entity = self.metadata.get_by_name( - entity=DatabaseService, fqn=db_service_name - ) for datamodel in self.data_models or []: try: data_model_entity = self._get_datamodel(datamodel_id=datamodel.id) if data_model_entity: - om_table = self._get_database_table( - db_service_entity, datamodel=datamodel + if len(datamodel.connectorProperties.tableQualifiers) > 1: + ( + database_name, + schema_name, + ) = datamodel.connectorProperties.tableQualifiers[-2:] + elif len(datamodel.connectorProperties.tableQualifiers) == 1: + schema_name = datamodel.connectorProperties.tableQualifiers[-1] + database_name = None + else: + schema_name, database_name = None, None + fqn_search_string = build_es_fqn_search_string( + database_name=database_name, + schema_name=schema_name, + service_name=db_service_name or "*", + table_name=datamodel.tableName, + ) + om_table = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, ) if om_table: columns_list = [col.name for col in datamodel.fields] diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index 1d036743002..88889ddf02e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -49,9 +49,8 @@ from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser -from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( LINEAGE_MAP, @@ -69,6 +68,7 @@ from metadata.ingestion.source.dashboard.quicksight.models import ( from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn from metadata.utils.filters import filter_by_chart +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -254,12 +254,14 @@ class QuicksightSource(DashboardServiceSource): data_model_entity, data_source_resp: DataSourceModel, dashboard_details: DashboardDetail, - db_service_entity, + db_service_name: Optional[str], ) -> Iterable[Either[AddLineageRequest]]: """yield lineage from table(parsed form query source) <-> dashboard""" - if not db_service_entity: - logger.debug(f"db service is not ingested") - return None + db_service_entity = None + if db_service_name: + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) sql_query = data_source_resp.data_source_resp.query source_database_names = [] try: @@ -278,7 +280,7 @@ class QuicksightSource(DashboardServiceSource): db_service_entity.serviceType.value ) if db_service_entity - else None, + else Dialect.ANSI, ) lineage_details = LineageDetails( source=LineageSource.DashboardLineage, sqlQuery=sql_query @@ -289,14 +291,18 @@ class QuicksightSource(DashboardServiceSource): database_schema_name = self.check_database_schema_name( database_schema_name ) - from_entities = search_table_entities( - metadata=self.metadata, - database=db_name, - service_name=db_service_entity.name.root, - database_schema=database_schema_name, - table=table, + fqn_search_string = build_es_fqn_search_string( + database_name=db_name, + schema_name=database_schema_name, + service_name=db_service_name or "*", + table_name=table, ) - for from_entity in from_entities: + from_entities = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + fetch_multiple_entities=True, + ) + for from_entity in from_entities or []: if from_entity is not None and data_model_entity is not None: columns = [ col.name.root for col in data_model_entity.columns @@ -385,7 +391,7 @@ class QuicksightSource(DashboardServiceSource): data_model_entity, data_source_resp: DataSourceModel, dashboard_details: DashboardDetail, - db_service_entity, + db_service_name: Optional[str], ) -> Iterable[Either[AddLineageRequest]]: """yield lineage from table <-> dashboard""" try: @@ -394,17 +400,15 @@ class QuicksightSource(DashboardServiceSource): if data_source_resp and data_source_resp.DataSourceParameters: data_source_dict = data_source_resp.DataSourceParameters for db in data_source_dict.keys() or []: - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_entity.name.root, + fqn_search_string = build_es_fqn_search_string( database_name=data_source_dict[db].get("Database"), schema_name=schema_name, + service_name=db_service_name or "*", table_name=table_name, ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, + from_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, ) if from_entity is not None and data_model_entity is not None: columns = [col.name.root for col in data_model_entity.columns] @@ -440,14 +444,13 @@ class QuicksightSource(DashboardServiceSource): return None def yield_dashboard_lineage_details( # pylint: disable=too-many-locals - self, dashboard_details: DashboardDetail, db_service_name: str + self, + dashboard_details: DashboardDetail, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between dashboard and data sources """ - db_service_entity = self.metadata.get_by_name( - entity=DatabaseService, fqn=db_service_name - ) for datamodel in self.data_models or []: try: data_model_entity = self._get_datamodel( @@ -460,7 +463,7 @@ class QuicksightSource(DashboardServiceSource): data_model_entity, datamodel.DataSource, dashboard_details, - db_service_entity, + db_service_name, ) elif isinstance( datamodel.DataSource.data_source_resp, DataSourceRespS3 @@ -473,7 +476,7 @@ class QuicksightSource(DashboardServiceSource): data_model_entity, datamodel.DataSource, dashboard_details, - db_service_entity, + db_service_name, ) except Exception as exc: # pylint: disable=broad-except yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py index 1067e1afcc7..d1afe47a3a7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py @@ -48,6 +48,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.utils import fqn from metadata.utils.filters import filter_by_chart +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import clean_uri, get_standard_chart_type from metadata.utils.logger import ingestion_logger from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels @@ -197,14 +198,15 @@ class RedashSource(DashboardServiceSource): ) def yield_dashboard_lineage_details( # pylint: disable=too-many-locals - self, dashboard_details: dict, db_service_name: str + self, + dashboard_details: dict, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between dashboard and data sources In redash we do not get table, database_schema or database name but we do get query the lineage is being generated based on the query """ - to_fqn = fqn.build( self.metadata, entity_type=LineageDashboard, @@ -229,17 +231,17 @@ class RedashSource(DashboardServiceSource): database_schema_name = self.check_database_schema_name( database_schema ) - from_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, - schema_name=database_schema_name, - table_name=database_schema_table.get("table"), + if not database_schema_table.get("table"): + continue + fqn_search_string = build_es_fqn_search_string( database_name=database_schema_table.get("database"), + schema_name=database_schema_name, + service_name=db_service_name or "*", + table_name=database_schema_table.get("table"), ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=from_fqn, + from_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, ) if from_entity and to_entity: yield self._get_add_lineage_request( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py index e44cf5e29a6..99e05b60107 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/sigma/metadata.py @@ -52,6 +52,7 @@ from metadata.ingestion.source.dashboard.sigma.models import ( ) from metadata.utils import fqn from metadata.utils.filters import filter_by_chart +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger @@ -196,39 +197,36 @@ class SigmaSource(DashboardServiceSource): return None def _get_table_entity_from_node( - self, node: NodeDetails, db_service_name: str + self, node: NodeDetails, db_service_name: Optional[str] ) -> Optional[Table]: """ Get the table entity for lineage """ if node.node_schema: try: - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_name, + fqn_search_string = build_es_fqn_search_string( + database_name=None, schema_name=node.node_schema, + service_name=db_service_name or "*", table_name=node.name, - database_name="", ) - if table_fqn: - return self.metadata.get_by_name( - entity=Table, - fqn=table_fqn, - ) + return self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Error occured while finding table fqn: {exc}") return None def yield_dashboard_lineage_details( - self, dashboard_details: WorkbookDetails, db_service_name: Optional[str] + self, + dashboard_details: WorkbookDetails, + db_service_name: Optional[str] = None, ): """ yield dashboard lineage """ - if not db_service_name: - return # charts and datamodels are same here as we are using charts as metadata for datamodels for data_model in self.data_models or []: try: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py index fcdd5312a32..f5850881ee6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py @@ -22,7 +22,6 @@ from metadata.generated.schema.api.data.createDashboardDataModel import ( ) from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboardDataModel import DataModelType -from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, @@ -42,6 +41,7 @@ from metadata.ingestion.source.dashboard.superset.models import ( ) from metadata.utils import fqn from metadata.utils.filters import filter_by_datamodel +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import ( clean_uri, get_database_name_for_lineage, @@ -130,10 +130,10 @@ class SupersetAPISource(SupersetSourceMixin): ) def _get_datasource_fqn_for_lineage( - self, chart_json: ChartResult, db_service_entity: DatabaseService + self, chart_json: ChartResult, db_service_name: Optional[str] ): return ( - self._get_datasource_fqn(chart_json.datasource_id, db_service_entity) + self._get_datasource_fqn(chart_json.datasource_id, db_service_name) if chart_json.datasource_id else None ) @@ -173,34 +173,33 @@ class SupersetAPISource(SupersetSourceMixin): ) def _get_datasource_fqn( - self, datasource_id: str, db_service_entity: DatabaseService + self, datasource_id: str, db_service_name: Optional[str] ) -> Optional[str]: try: datasource_json = self.client.fetch_datasource(datasource_id) if datasource_json: - database_json = self.client.fetch_database( - datasource_json.result.database.id - ) - default_database_name = ( - database_json.result.parameters.database - if database_json.result.parameters - else None - ) - - database_name = get_database_name_for_lineage( - db_service_entity, default_database_name - ) - - if database_json: - dataset_fqn = fqn.build( - self.metadata, - entity_type=Table, - table_name=datasource_json.result.table_name, - schema_name=datasource_json.result.table_schema, - database_name=database_name, - service_name=db_service_entity.name.root, + database_name = None + if db_service_name: + database_json = self.client.fetch_database( + datasource_json.result.database.id ) - return dataset_fqn + default_database_name = ( + database_json.result.parameters.database + if database_json.result.parameters + else None + ) + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) + database_name = get_database_name_for_lineage( + db_service_entity, default_database_name + ) + return build_es_fqn_search_string( + database_name=database_name, + schema_name=datasource_json.result.table_schema, + service_name=db_service_name or "*", + table_name=datasource_json.result.table_name, + ) except Exception as err: logger.debug(traceback.format_exc()) logger.warning( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py index 7896ab23a63..7a57eb2ee01 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py @@ -25,7 +25,6 @@ from metadata.generated.schema.api.data.createDashboardDataModel import ( ) from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboardDataModel import DataModelType -from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) @@ -58,6 +57,7 @@ from metadata.ingestion.source.dashboard.superset.queries import ( ) from metadata.utils import fqn from metadata.utils.filters import filter_by_datamodel +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import ( clean_uri, get_database_name_for_lineage, @@ -160,10 +160,10 @@ class SupersetDBSource(SupersetSourceMixin): ) def _get_datasource_fqn_for_lineage( - self, chart_json: FetchChart, db_service_entity: DatabaseService + self, chart_json: FetchChart, db_service_name: Optional[str] ): return ( - self._get_datasource_fqn(db_service_entity, chart_json) + self._get_datasource_fqn(db_service_name, chart_json) if chart_json.table_name else None ) @@ -216,20 +216,23 @@ class SupersetDBSource(SupersetSourceMixin): return get_database_name_for_lineage(db_service_entity, default_db_name) def _get_datasource_fqn( - self, db_service_entity: DatabaseService, chart_json: FetchChart + self, db_service_name: Optional[str], chart_json: FetchChart ) -> Optional[str]: try: - dataset_fqn = fqn.build( - self.metadata, - entity_type=Table, - table_name=chart_json.table_name, - database_name=self._get_database_name( + database_name = None + if db_service_name: + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) + database_name = self._get_database_name( chart_json.sqlalchemy_uri, db_service_entity - ), + ) + return build_es_fqn_search_string( + database_name=database_name, schema_name=chart_json.table_schema, - service_name=db_service_entity.name.root, + service_name=db_service_name or "*", + table_name=chart_json.table_name, ) - return dataset_fqn except Exception as err: logger.debug(traceback.format_exc()) logger.warning( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py index 8c5b0687cc1..7e796697011 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py @@ -30,7 +30,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) @@ -245,20 +244,21 @@ class SupersetSourceMixin(DashboardServiceSource): self, from_entities: List[Tuple[FetchChart, Dict[str, List[str]]]], to_entity: DashboardDataModel, - db_service_entity: DatabaseService, + db_service_name: Optional[str], ): result = [] for from_entity in from_entities: input_table, _column_lineage = from_entity datasource_fqn = self._get_datasource_fqn_for_lineage( - input_table, db_service_entity + input_table, db_service_name ) - from_entity = self.metadata.get_by_name( - entity=Table, - fqn=datasource_fqn, + from_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=datasource_fqn, ) - + if not from_entity: + continue column_lineage: List[ColumnLineage] = [] for to_column, from_columns in _column_lineage.items(): _from_columns = [ @@ -308,49 +308,45 @@ class SupersetSourceMixin(DashboardServiceSource): def yield_dashboard_lineage_details( self, dashboard_details: Union[FetchDashboard, DashboardResult], - db_service_name: DatabaseService, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ Get lineage between datamodel and table """ - db_service_entity = self.metadata.get_by_name( - entity=DatabaseService, fqn=db_service_name - ) - if db_service_entity: - for chart_json in filter( - None, - [ - self.all_charts.get(chart_id) - for chart_id in self._get_charts_of_dashboard(dashboard_details) - ], - ): - try: - to_entity = self._get_dashboard_data_model_entity(chart_json) + for chart_json in filter( + None, + [ + self.all_charts.get(chart_id) + for chart_id in self._get_charts_of_dashboard(dashboard_details) + ], + ): + try: + to_entity = self._get_dashboard_data_model_entity(chart_json) - if to_entity: - _input_tables = self._get_input_tables(chart_json) - input_tables = self._enrich_raw_input_tables( - _input_tables, to_entity, db_service_entity - ) - for input_table in input_tables: - from_entity_table, column_lineage = input_table - - yield self._get_add_lineage_request( - to_entity=to_entity, - from_entity=from_entity_table, - column_lineage=column_lineage, - ) - except Exception as exc: - yield Either( - left=StackTraceError( - name=db_service_name, - error=( - "Error to yield dashboard lineage details for DB " - f"service name [{db_service_name}]: {exc}" - ), - stackTrace=traceback.format_exc(), - ) + if to_entity: + _input_tables = self._get_input_tables(chart_json) + input_tables = self._enrich_raw_input_tables( + _input_tables, to_entity, db_service_name ) + for input_table in input_tables: + from_entity_table, column_lineage = input_table + + yield self._get_add_lineage_request( + to_entity=to_entity, + from_entity=from_entity_table, + column_lineage=column_lineage, + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=db_service_name, + error=( + "Error to yield dashboard lineage details for DB " + f"service name [{db_service_name}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) def _get_datamodel( self, datamodel: Union[SupersetDatasource, FetchChart] diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index bc5f3a22f92..2de8758a61d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -60,9 +60,9 @@ from metadata.generated.schema.type.entityReferenceList import EntityReferenceLi from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser -from metadata.ingestion.lineage.sql_lineage import get_column_fqn, search_table_entities +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( @@ -82,6 +82,7 @@ from metadata.ingestion.source.dashboard.tableau.models import ( from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn from metadata.utils.filters import filter_by_chart, filter_by_datamodel +from metadata.utils.fqn import build_es_fqn_search_string from metadata.utils.helpers import ( clean_uri, get_database_name_for_lineage, @@ -457,7 +458,7 @@ class TableauSource(DashboardServiceSource): self, upstream_data_model: DataSource, datamodel: DataSource, - db_service_entity: DatabaseService, + db_service_name: Optional[str], upstream_data_model_entity: DashboardDataModel, ) -> Iterable[Either[AddLineageRequest]]: """ @@ -471,7 +472,7 @@ class TableauSource(DashboardServiceSource): if column is not None } for table in datamodel.upstreamTables or []: - om_tables = self._get_database_tables(db_service_entity, table) + om_tables = self._get_database_tables(db_service_name, table) for om_table_and_query in om_tables or []: column_lineage = self._get_column_lineage( table, @@ -570,7 +571,7 @@ class TableauSource(DashboardServiceSource): self, datamodel: DataSource, data_model_entity: DashboardDataModel, - db_service_entity: DatabaseService, + db_service_name: Optional[str], ) -> Iterable[Either[AddLineageRequest]]: """ " Method to create lineage between tables<->published datasource<->embedded datasource @@ -594,7 +595,7 @@ class TableauSource(DashboardServiceSource): yield from self._get_table_datamodel_lineage( upstream_data_model=upstream_data_model, datamodel=datamodel, - db_service_entity=db_service_entity, + db_service_name=db_service_name, upstream_data_model_entity=upstream_data_model_entity, ) except Exception as err: @@ -603,14 +604,16 @@ class TableauSource(DashboardServiceSource): name="Lineage", error=( "Error to yield datamodel table lineage details for DB " - f"service name [{db_service_entity.name}]: {err}" + f"service name [{db_service_name}]: {err}" ), stackTrace=traceback.format_exc(), ) ) def yield_dashboard_lineage_details( - self, dashboard_details: TableauDashboard, db_service_name: str + self, + dashboard_details: TableauDashboard, + db_service_name: Optional[str] = None, ) -> Iterable[Either[AddLineageRequest]]: """ This method creates the lineage between tables and datamodels @@ -622,42 +625,38 @@ class TableauSource(DashboardServiceSource): Returns: Lineage request between Data Models and Database tables """ - db_service_entity = self.metadata.get_by_name( - entity=DatabaseService, fqn=db_service_name - ) - if db_service_entity: - for datamodel in dashboard_details.dataModels or []: - try: - data_model_entity = self._get_datamodel(datamodel=datamodel) - if data_model_entity: - if datamodel.upstreamDatasources: - # if we have upstreamDatasources(Published Datasources), create lineage in below format - # Table<->Published Datasource<->Embedded Datasource - yield from self._get_datamodel_table_lineage( - datamodel=datamodel, - data_model_entity=data_model_entity, - db_service_entity=db_service_entity, - ) - else: - # else we'll create lineage only using Embedded Datasources in below format - # Table<->Embedded Datasource - yield from self._get_table_datamodel_lineage( - upstream_data_model=datamodel, - datamodel=datamodel, - db_service_entity=db_service_entity, - upstream_data_model_entity=data_model_entity, - ) - except Exception as err: - yield Either( - left=StackTraceError( - name="Lineage", - error=( - "Error to yield dashboard lineage details for DB " - f"service name [{db_service_name}]: {err}" - ), - stackTrace=traceback.format_exc(), + for datamodel in dashboard_details.dataModels or []: + try: + data_model_entity = self._get_datamodel(datamodel=datamodel) + if data_model_entity: + if datamodel.upstreamDatasources: + # if we have upstreamDatasources(Published Datasources), create lineage in below format + # Table<->Published Datasource<->Embedded Datasource + yield from self._get_datamodel_table_lineage( + datamodel=datamodel, + data_model_entity=data_model_entity, + db_service_name=db_service_name, ) + else: + # else we'll create lineage only using Embedded Datasources in below format + # Table<->Embedded Datasource + yield from self._get_table_datamodel_lineage( + upstream_data_model=datamodel, + datamodel=datamodel, + db_service_name=db_service_name, + upstream_data_model_entity=data_model_entity, + ) + except Exception as err: + yield Either( + left=StackTraceError( + name="Lineage", + error=( + "Error to yield dashboard lineage details for DB " + f"service name [{db_service_name}]: {err}" + ), + stackTrace=traceback.format_exc(), ) + ) def yield_dashboard_chart( self, dashboard_details: TableauDashboard @@ -722,7 +721,7 @@ class TableauSource(DashboardServiceSource): self.metadata.close() def _get_table_entities_from_api( - self, db_service_entity: DatabaseService, table: UpstreamTable + self, db_service_name: Optional[str], table: UpstreamTable ) -> Optional[List[TableAndQuery]]: """ In case we get the table details from the Graphql APIs we process them @@ -734,39 +733,40 @@ class TableauSource(DashboardServiceSource): if table.database and table.database.name else database_schema_table.get("database") ) - if isinstance(db_service_entity.connection.config, BigQueryConnection): - database_name = None - database_name = get_database_name_for_lineage( - db_service_entity, database_name - ) + if db_service_name: + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) + if isinstance(db_service_entity.connection.config, BigQueryConnection): + database_name = None + database_name = get_database_name_for_lineage( + db_service_entity, database_name + ) schema_name = ( table.schema_ if table.schema_ else database_schema_table.get("database_schema") ) table_name = database_schema_table.get("table") - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=db_service_entity.name.root, - schema_name=schema_name, - table_name=table_name, + fqn_search_string = build_es_fqn_search_string( database_name=database_name, + schema_name=schema_name, + service_name=db_service_name or "*", + table_name=table_name, ) - if table_fqn: - table_entity = self.metadata.get_by_name( - entity=Table, - fqn=table_fqn, - ) - if table_entity: - return [TableAndQuery(table=table_entity)] + table_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + ) + if table_entity: + return [TableAndQuery(table=table_entity)] except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Error to get tables for lineage using GraphQL Apis: {exc}") return None def _get_table_entities_from_query( - self, db_service_entity: DatabaseService, table: UpstreamTable + self, db_service_name: Optional[str], table: UpstreamTable ) -> Optional[List[TableAndQuery]]: """ In case we get the table details from the Graphql APIs we process them @@ -774,34 +774,44 @@ class TableauSource(DashboardServiceSource): tables_list = [] try: for custom_sql_table in table.referencedByQueries or []: + db_service_entity = None + if db_service_name: + db_service_entity = self.metadata.get_by_name( + entity=DatabaseService, fqn=db_service_name + ) lineage_parser = LineageParser( custom_sql_table.query, ConnectionTypeDialectMapper.dialect_of( db_service_entity.serviceType.value ) if db_service_entity - else None, + else Dialect.ANSI, ) for source_table in lineage_parser.source_tables or []: database_schema_table = fqn.split_table_name(str(source_table)) database_name = database_schema_table.get("database") - if isinstance( - db_service_entity.connection.config, BigQueryConnection - ): - database_name = None - database_name = get_database_name_for_lineage( - db_service_entity, database_name - ) + if db_service_entity: + if isinstance( + db_service_entity.connection.config, BigQueryConnection + ): + database_name = None + database_name = get_database_name_for_lineage( + db_service_entity, database_name + ) schema_name = self.check_database_schema_name( database_schema_table.get("database_schema") ) table_name = database_schema_table.get("table") - from_entities = search_table_entities( - metadata=self.metadata, - database=database_name, - service_name=db_service_entity.fullyQualifiedName.root, - database_schema=schema_name, - table=table_name, + fqn_search_string = build_es_fqn_search_string( + database_name=database_name, + schema_name=schema_name, + service_name=db_service_name or "*", + table_name=table_name, + ) + from_entities = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + fetch_multiple_entities=True, ) tables_list.extend( [ @@ -816,7 +826,7 @@ class TableauSource(DashboardServiceSource): return tables_list or [] def _get_database_tables( - self, db_service_entity: DatabaseService, table: UpstreamTable + self, db_service_name: Optional[str], table: UpstreamTable ) -> Optional[List[TableAndQuery]]: """ Get the table entities for lineage @@ -824,12 +834,12 @@ class TableauSource(DashboardServiceSource): # If we get the table details from the Graphql APIs we process them directly if table.name: return self._get_table_entities_from_api( - db_service_entity=db_service_entity, table=table + db_service_name=db_service_name, table=table ) # Else we get the table details from the SQL queries and process them using SQL lineage parser if table.referencedByQueries: return self._get_table_entities_from_query( - db_service_entity=db_service_entity, table=table + db_service_name=db_service_name, table=table ) return None diff --git a/ingestion/tests/unit/topology/dashboard/test_metabase.py b/ingestion/tests/unit/topology/dashboard/test_metabase.py index 5f66992a8ef..2e76a4b9241 100644 --- a/ingestion/tests/unit/topology/dashboard/test_metabase.py +++ b/ingestion/tests/unit/topology/dashboard/test_metabase.py @@ -44,7 +44,6 @@ from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.dashboard.metabase import metadata as MetabaseMetadata from metadata.ingestion.source.dashboard.metabase.metadata import MetabaseSource from metadata.ingestion.source.dashboard.metabase.models import ( DatasetQuery, @@ -277,7 +276,7 @@ class MetabaseUnitTest(TestCase): @patch.object(fqn, "build", return_value=None) @patch.object(OpenMetadata, "get_by_name", return_value=EXAMPLE_DASHBOARD) - @patch.object(MetabaseMetadata, "search_table_entities", return_value=EXAMPLE_TABLE) + @patch.object(OpenMetadata, "search_in_any_service", return_value=EXAMPLE_TABLE) @patch.object( MetabaseSource, "_get_database_service", return_value=MOCK_DATABASE_SERVICE ) @@ -294,7 +293,7 @@ class MetabaseUnitTest(TestCase): result = self.metabase.yield_dashboard_lineage_details( dashboard_details=MOCK_DASHBOARD_DETAILS, db_service_name=None ) - self.assertEqual(list(result), []) + self.assertEqual(next(result).right, EXPECTED_LINEAGE) # test out _yield_lineage_from_api mock_dashboard = deepcopy(MOCK_DASHBOARD_DETAILS) diff --git a/ingestion/tests/unit/topology/dashboard/test_superset.py b/ingestion/tests/unit/topology/dashboard/test_superset.py index b8ea2c77c82..bc7af4014a3 100644 --- a/ingestion/tests/unit/topology/dashboard/test_superset.py +++ b/ingestion/tests/unit/topology/dashboard/test_superset.py @@ -16,6 +16,7 @@ import json import uuid from pathlib import Path from unittest import TestCase +from unittest.mock import patch import sqlalchemy from collate_sqllineage.core.models import Column, Schema, SubQuery, Table @@ -224,7 +225,7 @@ MOCK_DATASOURCE = [ EXPECTED_ALL_CHARTS_DB = {1: MOCK_CHART_DB_2} NOT_FOUND_RESP = {"message": "Not found"} -EXPECTED_API_DATASET_FQN = None +EXPECTED_API_DATASET_FQN = "test_postgres.*.main.wb_health_population" EXPECTED_DATASET_FQN = "test_postgres.examples.main.wb_health_population" @@ -577,19 +578,25 @@ class SupersetUnitTest(TestCase): self.assertEqual(dashboard_charts, EXPECTED_CHART) def test_api_get_datasource_fqn(self): - """ - Test generated datasource fqn for api source - """ - fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access - 1, MOCK_DB_POSTGRES_SERVICE - ) - self.assertEqual(fqn, EXPECTED_API_DATASET_FQN) + with patch.object( + OpenMetadata, "get_by_name", return_value=MOCK_DB_POSTGRES_SERVICE + ): + """ + Test generated datasource fqn for api source + """ + fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access + 1, MOCK_DB_POSTGRES_SERVICE.name.root + ) + self.assertEqual(fqn, EXPECTED_API_DATASET_FQN) def test_db_get_datasource_fqn_for_lineage(self): - fqn = self.superset_db._get_datasource_fqn_for_lineage( # pylint: disable=protected-access - MOCK_CHART_DB, MOCK_DB_POSTGRES_SERVICE - ) - self.assertEqual(fqn, EXPECTED_DATASET_FQN) + with patch.object( + OpenMetadata, "get_by_name", return_value=MOCK_DB_POSTGRES_SERVICE + ): + fqn = self.superset_db._get_datasource_fqn_for_lineage( # pylint: disable=protected-access + MOCK_CHART_DB, MOCK_DB_POSTGRES_SERVICE.name.root + ) + self.assertEqual(fqn, EXPECTED_DATASET_FQN) def test_db_get_database_name(self): sqa_str1 = "postgres://user:pass@localhost:8888/database"