mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-24 08:58:06 +00:00
MINOR - Fix SP topology context & Looker usage context (#14816)
* MINOR - Fix SP topology context & Looker usage context * MINOR - Fix SP topology context & Looker usage context * Fix tests
This commit is contained in:
parent
492bac32c0
commit
337796d612
@ -15,7 +15,7 @@ generate the _run based on their topology.
|
|||||||
import traceback
|
import traceback
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from functools import singledispatchmethod
|
from functools import singledispatchmethod
|
||||||
from typing import Any, Generic, Iterable, List, Type, TypeVar, Union
|
from typing import Any, Generic, Iterable, List, Type, TypeVar
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@ -267,20 +267,46 @@ class TopologyRunnerMixin(Generic[C]):
|
|||||||
*context_names, entity_name
|
*context_names, entity_name
|
||||||
)
|
)
|
||||||
|
|
||||||
def update_context(
|
def update_context(self, stage: NodeStage, right: C):
|
||||||
self, stage: NodeStage, context: Union[str, OMetaTagAndClassification]
|
|
||||||
):
|
|
||||||
"""
|
"""
|
||||||
Append or update context
|
Append or update context
|
||||||
|
|
||||||
We'll store the entity_name in the topology context instead of the entity_fqn
|
We'll store the entity name or FQN in the topology context.
|
||||||
and build the FQN on-the-fly wherever required.
|
If we store the name, the FQN will be built in the source itself when needed.
|
||||||
This is mainly because we need the context in other places
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if stage.store_fqn:
|
||||||
|
new_context = self._build_new_context_fqn(right)
|
||||||
|
else:
|
||||||
|
new_context = model_str(right.name)
|
||||||
|
|
||||||
if stage.context and not stage.store_all_in_context:
|
if stage.context and not stage.store_all_in_context:
|
||||||
self._replace_context(key=stage.context, value=context)
|
self._replace_context(key=stage.context, value=new_context)
|
||||||
if stage.context and stage.store_all_in_context:
|
if stage.context and stage.store_all_in_context:
|
||||||
self._append_context(key=stage.context, value=context)
|
self._append_context(key=stage.context, value=new_context)
|
||||||
|
|
||||||
|
@singledispatchmethod
|
||||||
|
def _build_new_context_fqn(self, right: C) -> str:
|
||||||
|
"""Build context fqn string"""
|
||||||
|
raise NotImplementedError(f"Missing implementation for [{type(C)}]")
|
||||||
|
|
||||||
|
@_build_new_context_fqn.register
|
||||||
|
def _(self, right: CreateStoredProcedureRequest) -> str:
|
||||||
|
"""
|
||||||
|
Implement FQN context building for Stored Procedures.
|
||||||
|
|
||||||
|
We process the Stored Procedures lineage at the very end of the service. If we
|
||||||
|
just store the SP name, we lose the information of which db/schema the SP belongs to.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return fqn.build(
|
||||||
|
metadata=self.metadata,
|
||||||
|
entity_type=StoredProcedure,
|
||||||
|
service_name=self.context.database_service,
|
||||||
|
database_name=self.context.database,
|
||||||
|
schema_name=self.context.database_schema,
|
||||||
|
procedure_name=right.name.__root__,
|
||||||
|
)
|
||||||
|
|
||||||
def create_patch_request(
|
def create_patch_request(
|
||||||
self, original_entity: Entity, create_request: C
|
self, original_entity: Entity, create_request: C
|
||||||
@ -379,7 +405,7 @@ class TopologyRunnerMixin(Generic[C]):
|
|||||||
"for the service connection."
|
"for the service connection."
|
||||||
)
|
)
|
||||||
|
|
||||||
self.update_context(stage=stage, context=entity_name)
|
self.update_context(stage=stage, right=right)
|
||||||
|
|
||||||
@yield_and_update_context.register
|
@yield_and_update_context.register
|
||||||
def _(
|
def _(
|
||||||
@ -395,7 +421,7 @@ class TopologyRunnerMixin(Generic[C]):
|
|||||||
lineage has been properly drawn. We'll skip the process for now.
|
lineage has been properly drawn. We'll skip the process for now.
|
||||||
"""
|
"""
|
||||||
yield entity_request
|
yield entity_request
|
||||||
self.update_context(stage=stage, context=right.edge.fromEntity.name.__root__)
|
self.update_context(stage=stage, right=right.edge.fromEntity.name.__root__)
|
||||||
|
|
||||||
@yield_and_update_context.register
|
@yield_and_update_context.register
|
||||||
def _(
|
def _(
|
||||||
@ -408,7 +434,7 @@ class TopologyRunnerMixin(Generic[C]):
|
|||||||
yield entity_request
|
yield entity_request
|
||||||
|
|
||||||
# We'll keep the tag fqn in the context and use if required
|
# We'll keep the tag fqn in the context and use if required
|
||||||
self.update_context(stage=stage, context=right)
|
self.update_context(stage=stage, right=right)
|
||||||
|
|
||||||
@yield_and_update_context.register
|
@yield_and_update_context.register
|
||||||
def _(
|
def _(
|
||||||
@ -421,29 +447,7 @@ class TopologyRunnerMixin(Generic[C]):
|
|||||||
yield entity_request
|
yield entity_request
|
||||||
|
|
||||||
# We'll keep the tag fqn in the context and use if required
|
# We'll keep the tag fqn in the context and use if required
|
||||||
self.update_context(stage=stage, context=right)
|
self.update_context(stage=stage, right=right)
|
||||||
|
|
||||||
@yield_and_update_context.register
|
|
||||||
def _(
|
|
||||||
self,
|
|
||||||
right: CreateStoredProcedureRequest,
|
|
||||||
stage: NodeStage,
|
|
||||||
entity_request: Either[C],
|
|
||||||
) -> Iterable[Either[Entity]]:
|
|
||||||
"""Tag implementation for the context information"""
|
|
||||||
yield entity_request
|
|
||||||
|
|
||||||
procedure_fqn = fqn.build(
|
|
||||||
metadata=self.metadata,
|
|
||||||
entity_type=StoredProcedure,
|
|
||||||
service_name=self.context.database_service,
|
|
||||||
database_name=self.context.database,
|
|
||||||
schema_name=self.context.database_schema,
|
|
||||||
procedure_name=right.name.__root__,
|
|
||||||
)
|
|
||||||
|
|
||||||
# We'll keep the proc fqn in the context and use if required
|
|
||||||
self.update_context(stage=stage, context=procedure_fqn)
|
|
||||||
|
|
||||||
def sink_request(
|
def sink_request(
|
||||||
self, stage: NodeStage, entity_request: Either[C]
|
self, stage: NodeStage, entity_request: Either[C]
|
||||||
|
@ -65,6 +65,10 @@ class NodeStage(BaseModel, Generic[T]):
|
|||||||
False,
|
False,
|
||||||
description="If we need to clean the values in the context for each produced element",
|
description="If we need to clean the values in the context for each produced element",
|
||||||
)
|
)
|
||||||
|
store_fqn: bool = Field(
|
||||||
|
False,
|
||||||
|
description="If true, store the entity FQN in the context instead of just the name",
|
||||||
|
)
|
||||||
|
|
||||||
# Used to compute the fingerprint
|
# Used to compute the fingerprint
|
||||||
cache_entities: bool = Field(
|
cache_entities: bool = Field(
|
||||||
|
@ -47,9 +47,7 @@ from metadata.generated.schema.api.data.createDashboardDataModel import (
|
|||||||
)
|
)
|
||||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||||
from metadata.generated.schema.entity.data.chart import Chart
|
from metadata.generated.schema.entity.data.chart import Chart
|
||||||
from metadata.generated.schema.entity.data.dashboard import (
|
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||||
Dashboard as MetadataDashboard,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.entity.data.dashboardDataModel import (
|
from metadata.generated.schema.entity.data.dashboardDataModel import (
|
||||||
DashboardDataModel,
|
DashboardDataModel,
|
||||||
DataModelType,
|
DataModelType,
|
||||||
@ -758,12 +756,12 @@ class LookerSource(DashboardServiceSource):
|
|||||||
if cached_explore:
|
if cached_explore:
|
||||||
dashboard_fqn = fqn.build(
|
dashboard_fqn = fqn.build(
|
||||||
self.metadata,
|
self.metadata,
|
||||||
entity_type=MetadataDashboard,
|
entity_type=Dashboard,
|
||||||
service_name=self.context.dashboard_service,
|
service_name=self.context.dashboard_service,
|
||||||
dashboard_name=self.context.dashboard,
|
dashboard_name=self.context.dashboard,
|
||||||
)
|
)
|
||||||
dashboard_entity = self.metadata.get_by_name(
|
dashboard_entity = self.metadata.get_by_name(
|
||||||
entity=MetadataDashboard, fqn=dashboard_fqn
|
entity=Dashboard, fqn=dashboard_fqn
|
||||||
)
|
)
|
||||||
yield Either(
|
yield Either(
|
||||||
right=AddLineageRequest(
|
right=AddLineageRequest(
|
||||||
@ -796,7 +794,7 @@ class LookerSource(DashboardServiceSource):
|
|||||||
self,
|
self,
|
||||||
source: str,
|
source: str,
|
||||||
db_service_name: str,
|
db_service_name: str,
|
||||||
to_entity: Union[MetadataDashboard, DashboardDataModel],
|
to_entity: Union[Dashboard, DashboardDataModel],
|
||||||
) -> Optional[Either[AddLineageRequest]]:
|
) -> Optional[Either[AddLineageRequest]]:
|
||||||
"""
|
"""
|
||||||
Once we have a list of origin data sources, check their components
|
Once we have a list of origin data sources, check their components
|
||||||
@ -941,9 +939,23 @@ class LookerSource(DashboardServiceSource):
|
|||||||
:return: UsageRequest, if not computed
|
:return: UsageRequest, if not computed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
dashboard: MetadataDashboard = self.context.dashboard
|
dashboard_name = self.context.dashboard
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
||||||
|
dashboard_fqn = fqn.build(
|
||||||
|
metadata=self.metadata,
|
||||||
|
entity_type=Dashboard,
|
||||||
|
service_name=self.context.dashboard_service,
|
||||||
|
dashboard_name=dashboard_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
dashboard: Dashboard = self.metadata.get_by_name(
|
||||||
|
entity=Dashboard,
|
||||||
|
fqn=dashboard_fqn,
|
||||||
|
fields=["usageSummary"],
|
||||||
|
)
|
||||||
|
|
||||||
current_views = dashboard_details.view_count
|
current_views = dashboard_details.view_count
|
||||||
|
|
||||||
if not current_views:
|
if not current_views:
|
||||||
@ -995,8 +1007,8 @@ class LookerSource(DashboardServiceSource):
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
yield Either(
|
yield Either(
|
||||||
left=StackTraceError(
|
left=StackTraceError(
|
||||||
name=f"{dashboard.name} Usage",
|
name=f"{dashboard_name} Usage",
|
||||||
error=f"Exception computing dashboard usage for {dashboard.fullyQualifiedName.__root__}: {exc}",
|
error=f"Exception computing dashboard usage for {dashboard_name}: {exc}",
|
||||||
stackTrace=traceback.format_exc(),
|
stackTrace=traceback.format_exc(),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -186,6 +186,7 @@ class DatabaseServiceTopology(ServiceTopology):
|
|||||||
processor="yield_stored_procedure",
|
processor="yield_stored_procedure",
|
||||||
consumer=["database_service", "database", "database_schema"],
|
consumer=["database_service", "database", "database_schema"],
|
||||||
store_all_in_context=True,
|
store_all_in_context=True,
|
||||||
|
store_fqn=True,
|
||||||
use_cache=True,
|
use_cache=True,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
|
@ -397,26 +397,28 @@ class LookerUnitTest(TestCase):
|
|||||||
Validate the logic for existing or new usage
|
Validate the logic for existing or new usage
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self.looker.context.__dict__["dashboard"] = "dashboard_name"
|
||||||
|
MOCK_LOOKER_DASHBOARD.view_count = 10
|
||||||
|
|
||||||
# Start checking dashboard without usage
|
# Start checking dashboard without usage
|
||||||
# and a view count
|
# and a view count
|
||||||
self.looker.context.__dict__["dashboard"] = Dashboard(
|
return_value = Dashboard(
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="dashboard_name",
|
name="dashboard_name",
|
||||||
fullyQualifiedName="dashboard_service.dashboard_name",
|
fullyQualifiedName="dashboard_service.dashboard_name",
|
||||||
service=EntityReference(id=uuid.uuid4(), type="dashboardService"),
|
service=EntityReference(id=uuid.uuid4(), type="dashboardService"),
|
||||||
)
|
)
|
||||||
MOCK_LOOKER_DASHBOARD.view_count = 10
|
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right,
|
next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right,
|
||||||
DashboardUsage(
|
DashboardUsage(
|
||||||
dashboard=self.looker.context.dashboard,
|
dashboard=return_value,
|
||||||
usage=UsageRequest(date=self.looker.today, count=10),
|
usage=UsageRequest(date=self.looker.today, count=10),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Now check what happens if we already have some summary data for today
|
# Now check what happens if we already have some summary data for today
|
||||||
self.looker.context.__dict__["dashboard"] = Dashboard(
|
return_value = Dashboard(
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="dashboard_name",
|
name="dashboard_name",
|
||||||
fullyQualifiedName="dashboard_service.dashboard_name",
|
fullyQualifiedName="dashboard_service.dashboard_name",
|
||||||
@ -425,14 +427,14 @@ class LookerUnitTest(TestCase):
|
|||||||
dailyStats=UsageStats(count=10), date=self.looker.today
|
dailyStats=UsageStats(count=10), date=self.looker.today
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
|
||||||
# Nothing is returned
|
# Nothing is returned
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 0
|
len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 0
|
||||||
)
|
)
|
||||||
|
|
||||||
# But if we have usage for today but the count is 0, we'll return the details
|
# But if we have usage for today but the count is 0, we'll return the details
|
||||||
self.looker.context.__dict__["dashboard"] = Dashboard(
|
return_value = Dashboard(
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="dashboard_name",
|
name="dashboard_name",
|
||||||
fullyQualifiedName="dashboard_service.dashboard_name",
|
fullyQualifiedName="dashboard_service.dashboard_name",
|
||||||
@ -441,16 +443,17 @@ class LookerUnitTest(TestCase):
|
|||||||
dailyStats=UsageStats(count=0), date=self.looker.today
|
dailyStats=UsageStats(count=0), date=self.looker.today
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right,
|
next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right,
|
||||||
DashboardUsage(
|
DashboardUsage(
|
||||||
dashboard=self.looker.context.dashboard,
|
dashboard=return_value,
|
||||||
usage=UsageRequest(date=self.looker.today, count=10),
|
usage=UsageRequest(date=self.looker.today, count=10),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# But if we have usage for another day, then we do the difference
|
# But if we have usage for another day, then we do the difference
|
||||||
self.looker.context.__dict__["dashboard"] = Dashboard(
|
return_value = Dashboard(
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="dashboard_name",
|
name="dashboard_name",
|
||||||
fullyQualifiedName="dashboard_service.dashboard_name",
|
fullyQualifiedName="dashboard_service.dashboard_name",
|
||||||
@ -460,17 +463,18 @@ class LookerUnitTest(TestCase):
|
|||||||
date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"),
|
date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right,
|
next(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD)).right,
|
||||||
DashboardUsage(
|
DashboardUsage(
|
||||||
dashboard=self.looker.context.dashboard,
|
dashboard=return_value,
|
||||||
usage=UsageRequest(date=self.looker.today, count=5),
|
usage=UsageRequest(date=self.looker.today, count=5),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# If the past usage is higher than what we have today, something weird is going on
|
# If the past usage is higher than what we have today, something weird is going on
|
||||||
# we don't return usage but don't explode
|
# we don't return usage but don't explode
|
||||||
self.looker.context.__dict__["dashboard"] = Dashboard(
|
return_value = Dashboard(
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="dashboard_name",
|
name="dashboard_name",
|
||||||
fullyQualifiedName="dashboard_service.dashboard_name",
|
fullyQualifiedName="dashboard_service.dashboard_name",
|
||||||
@ -480,7 +484,7 @@ class LookerUnitTest(TestCase):
|
|||||||
date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"),
|
date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 1
|
len(list(self.looker.yield_dashboard_usage(MOCK_LOOKER_DASHBOARD))), 1
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user