diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 70636b4294c..e643becd991 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -15,7 +15,7 @@ generate the _run based on their topology. import traceback from collections import defaultdict from functools import singledispatchmethod -from typing import Any, Generic, Iterable, List, Type, TypeVar, Union +from typing import Any, Generic, Iterable, List, Type, TypeVar from pydantic import BaseModel @@ -267,20 +267,46 @@ class TopologyRunnerMixin(Generic[C]): *context_names, entity_name ) - def update_context( - self, stage: NodeStage, context: Union[str, OMetaTagAndClassification] - ): + def update_context(self, stage: NodeStage, right: C): """ Append or update context - We'll store the entity_name in the topology context instead of the entity_fqn - and build the FQN on-the-fly wherever required. - This is mainly because we need the context in other places + We'll store the entity name or FQN in the topology context. + If we store the name, the FQN will be built in the source itself when needed. """ + + if stage.store_fqn: + new_context = self._build_new_context_fqn(right) + else: + new_context = model_str(right.name) + if stage.context and not stage.store_all_in_context: - self._replace_context(key=stage.context, value=context) + self._replace_context(key=stage.context, value=new_context) if stage.context and stage.store_all_in_context: - self._append_context(key=stage.context, value=context) + self._append_context(key=stage.context, value=new_context) + + @singledispatchmethod + def _build_new_context_fqn(self, right: C) -> str: + """Build context fqn string""" + raise NotImplementedError(f"Missing implementation for [{type(C)}]") + + @_build_new_context_fqn.register + def _(self, right: CreateStoredProcedureRequest) -> str: + """ + Implement FQN context building for Stored Procedures. + + We process the Stored Procedures lineage at the very end of the service. If we + just store the SP name, we lose the information of which db/schema the SP belongs to. + """ + + return fqn.build( + metadata=self.metadata, + entity_type=StoredProcedure, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + procedure_name=right.name.__root__, + ) def create_patch_request( self, original_entity: Entity, create_request: C @@ -379,7 +405,7 @@ class TopologyRunnerMixin(Generic[C]): "for the service connection." ) - self.update_context(stage=stage, context=entity_name) + self.update_context(stage=stage, right=right) @yield_and_update_context.register def _( @@ -395,7 +421,7 @@ class TopologyRunnerMixin(Generic[C]): lineage has been properly drawn. We'll skip the process for now. """ yield entity_request - self.update_context(stage=stage, context=right.edge.fromEntity.name.__root__) + self.update_context(stage=stage, right=right.edge.fromEntity.name.__root__) @yield_and_update_context.register def _( @@ -408,7 +434,7 @@ class TopologyRunnerMixin(Generic[C]): yield entity_request # We'll keep the tag fqn in the context and use if required - self.update_context(stage=stage, context=right) + self.update_context(stage=stage, right=right) @yield_and_update_context.register def _( @@ -421,29 +447,7 @@ class TopologyRunnerMixin(Generic[C]): yield entity_request # We'll keep the tag fqn in the context and use if required - self.update_context(stage=stage, context=right) - - @yield_and_update_context.register - def _( - self, - right: CreateStoredProcedureRequest, - stage: NodeStage, - entity_request: Either[C], - ) -> Iterable[Either[Entity]]: - """Tag implementation for the context information""" - yield entity_request - - procedure_fqn = fqn.build( - metadata=self.metadata, - entity_type=StoredProcedure, - service_name=self.context.database_service, - database_name=self.context.database, - schema_name=self.context.database_schema, - procedure_name=right.name.__root__, - ) - - # We'll keep the proc fqn in the context and use if required - self.update_context(stage=stage, context=procedure_fqn) + self.update_context(stage=stage, right=right) def sink_request( self, stage: NodeStage, entity_request: Either[C] diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index 3dfec152a39..18aef03f3ae 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -65,6 +65,10 @@ class NodeStage(BaseModel, Generic[T]): False, description="If we need to clean the values in the context for each produced element", ) + store_fqn: bool = Field( + False, + description="If true, store the entity FQN in the context instead of just the name", + ) # Used to compute the fingerprint cache_entities: bool = Field( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index 1aa867fa290..6bc74c12558 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -47,9 +47,7 @@ from metadata.generated.schema.api.data.createDashboardDataModel import ( ) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.chart import Chart -from metadata.generated.schema.entity.data.dashboard import ( - Dashboard as MetadataDashboard, -) +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboardDataModel import ( DashboardDataModel, DataModelType, @@ -758,12 +756,12 @@ class LookerSource(DashboardServiceSource): if cached_explore: dashboard_fqn = fqn.build( self.metadata, - entity_type=MetadataDashboard, + entity_type=Dashboard, service_name=self.context.dashboard_service, dashboard_name=self.context.dashboard, ) dashboard_entity = self.metadata.get_by_name( - entity=MetadataDashboard, fqn=dashboard_fqn + entity=Dashboard, fqn=dashboard_fqn ) yield Either( right=AddLineageRequest( @@ -796,7 +794,7 @@ class LookerSource(DashboardServiceSource): self, source: str, db_service_name: str, - to_entity: Union[MetadataDashboard, DashboardDataModel], + to_entity: Union[Dashboard, DashboardDataModel], ) -> Optional[Either[AddLineageRequest]]: """ Once we have a list of origin data sources, check their components @@ -941,9 +939,23 @@ class LookerSource(DashboardServiceSource): :return: UsageRequest, if not computed """ - dashboard: MetadataDashboard = self.context.dashboard + dashboard_name = self.context.dashboard try: + + dashboard_fqn = fqn.build( + metadata=self.metadata, + entity_type=Dashboard, + service_name=self.context.dashboard_service, + dashboard_name=dashboard_name, + ) + + dashboard: Dashboard = self.metadata.get_by_name( + entity=Dashboard, + fqn=dashboard_fqn, + fields=["usageSummary"], + ) + current_views = dashboard_details.view_count if not current_views: @@ -995,8 +1007,8 @@ class LookerSource(DashboardServiceSource): except Exception as exc: yield Either( left=StackTraceError( - name=f"{dashboard.name} Usage", - error=f"Exception computing dashboard usage for {dashboard.fullyQualifiedName.__root__}: {exc}", + name=f"{dashboard_name} Usage", + error=f"Exception computing dashboard usage for {dashboard_name}: {exc}", stackTrace=traceback.format_exc(), ) ) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 9c57ff94c48..8d91ad2765b 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -186,6 +186,7 @@ class DatabaseServiceTopology(ServiceTopology): processor="yield_stored_procedure", consumer=["database_service", "database", "database_schema"], store_all_in_context=True, + store_fqn=True, use_cache=True, ), ], diff --git a/ingestion/tests/unit/topology/dashboard/test_looker.py b/ingestion/tests/unit/topology/dashboard/test_looker.py index 70dab3451a4..cb4c2d3d50b 100644 --- a/ingestion/tests/unit/topology/dashboard/test_looker.py +++ b/ingestion/tests/unit/topology/dashboard/test_looker.py @@ -397,26 +397,28 @@ class LookerUnitTest(TestCase): Validate the logic for existing or new usage """ + self.looker.context.__dict__["dashboard"] = "dashboard_name" + MOCK_LOOKER_DASHBOARD.view_count = 10 + # Start checking dashboard without usage # and a view count - self.looker.context.__dict__["dashboard"] = Dashboard( + return_value = Dashboard( id=uuid.uuid4(), name="dashboard_name", fullyQualifiedName="dashboard_service.dashboard_name", service=EntityReference(id=uuid.uuid4(), type="dashboardService"), ) - MOCK_LOOKER_DASHBOARD.view_count = 10 - - self.assertEqual( - next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right, - DashboardUsage( - dashboard=self.looker.context.dashboard, - usage=UsageRequest(date=self.looker.today, count=10), - ), - ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + self.assertEqual( + next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right, + DashboardUsage( + dashboard=return_value, + usage=UsageRequest(date=self.looker.today, count=10), + ), + ) # Now check what happens if we already have some summary data for today - self.looker.context.__dict__["dashboard"] = Dashboard( + return_value = Dashboard( id=uuid.uuid4(), name="dashboard_name", fullyQualifiedName="dashboard_service.dashboard_name", @@ -425,14 +427,14 @@ class LookerUnitTest(TestCase): dailyStats=UsageStats(count=10), date=self.looker.today ), ) - - # Nothing is returned - self.assertEqual( - len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 0 - ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + # Nothing is returned + self.assertEqual( + len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 0 + ) # But if we have usage for today but the count is 0, we'll return the details - self.looker.context.__dict__["dashboard"] = Dashboard( + return_value = Dashboard( id=uuid.uuid4(), name="dashboard_name", fullyQualifiedName="dashboard_service.dashboard_name", @@ -441,16 +443,17 @@ class LookerUnitTest(TestCase): dailyStats=UsageStats(count=0), date=self.looker.today ), ) - self.assertEqual( - next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right, - DashboardUsage( - dashboard=self.looker.context.dashboard, - usage=UsageRequest(date=self.looker.today, count=10), - ), - ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + self.assertEqual( + next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right, + DashboardUsage( + dashboard=return_value, + usage=UsageRequest(date=self.looker.today, count=10), + ), + ) # But if we have usage for another day, then we do the difference - self.looker.context.__dict__["dashboard"] = Dashboard( + return_value = Dashboard( id=uuid.uuid4(), name="dashboard_name", fullyQualifiedName="dashboard_service.dashboard_name", @@ -460,17 +463,18 @@ class LookerUnitTest(TestCase): date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), ), ) - self.assertEqual( - next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right, - DashboardUsage( - dashboard=self.looker.context.dashboard, - usage=UsageRequest(date=self.looker.today, count=5), - ), - ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + self.assertEqual( + next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right, + DashboardUsage( + dashboard=return_value, + usage=UsageRequest(date=self.looker.today, count=5), + ), + ) # If the past usage is higher than what we have today, something weird is going on # we don't return usage but don't explode - self.looker.context.__dict__["dashboard"] = Dashboard( + return_value = Dashboard( id=uuid.uuid4(), name="dashboard_name", fullyQualifiedName="dashboard_service.dashboard_name", @@ -480,11 +484,11 @@ class LookerUnitTest(TestCase): date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), ), ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + self.assertEqual( + len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 1 + ) - self.assertEqual( - len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 1 - ) - - self.assertIsNotNone( - list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))[0].left - ) + self.assertIsNotNone( + list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))[0].left + )