MINOR - Fix & Organize topology context (#14838)

* MINOR - Fix & Organize topology context

* Handle missing context charts
This commit is contained in:
Pere Miquel Brull 2024-01-25 08:22:07 +01:00 committed by GitHub
parent ac4dc7ffc3
commit 85e2058979
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 341 additions and 158 deletions

View File

@ -19,9 +19,6 @@ from typing import Any, Generic, Iterable, List, Type, TypeVar
from pydantic import BaseModel
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -35,13 +32,11 @@ from metadata.ingestion.models.topology import (
ServiceTopology,
TopologyContext,
TopologyNode,
get_ctx_default,
get_topology_node,
get_topology_root,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.source_hash import generate_source_hash
@ -112,7 +107,7 @@ class TopologyRunnerMixin(Generic[C]):
# Once we are done processing all the stages,
for stage in node.stages:
if stage.clear_context:
self.clear_context(stage=stage)
self.context.clear_stage(stage=stage)
# process all children from the node being run
yield from self.process_nodes(child_nodes)
@ -182,7 +177,7 @@ class TopologyRunnerMixin(Generic[C]):
for child_node in child_nodes or []:
for child_stage in child_node.stages or []:
if child_stage.use_cache:
entity_fqn = self.fqn_from_context(
entity_fqn = self.context.fqn_from_stage(
stage=stage,
entity_name=self.context.__dict__[stage.context],
)
@ -229,85 +224,6 @@ class TopologyRunnerMixin(Generic[C]):
"""
yield from self.process_nodes(get_topology_root(self.topology))
def _replace_context(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
self.context.__dict__[key] = value
def _append_context(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
self.context.__dict__[key].append(value)
def clear_context(self, stage: NodeStage) -> None:
"""
Clear the available context
:param stage: Update stage context to the default values
"""
self.context.__dict__[stage.context] = get_ctx_default(stage)
def fqn_from_context(self, stage: NodeStage, entity_name: str) -> str:
"""
Read the context
:param stage: Topology node being processed
:param entity_name: name being stored
:return: Entity FQN derived from context
"""
context_names = [
self.context.__dict__[dependency]
for dependency in stage.consumer or [] # root nodes do not have consumers
]
return fqn._build( # pylint: disable=protected-access
*context_names, entity_name
)
def update_context(self, stage: NodeStage, right: C):
"""
Append or update context
We'll store the entity name or FQN in the topology context.
If we store the name, the FQN will be built in the source itself when needed.
"""
if stage.store_fqn:
new_context = self._build_new_context_fqn(right)
else:
new_context = model_str(right.name)
if stage.context and not stage.store_all_in_context:
self._replace_context(key=stage.context, value=new_context)
if stage.context and stage.store_all_in_context:
self._append_context(key=stage.context, value=new_context)
@singledispatchmethod
def _build_new_context_fqn(self, right: C) -> str:
"""Build context fqn string"""
raise NotImplementedError(f"Missing implementation for [{type(C)}]")
@_build_new_context_fqn.register
def _(self, right: CreateStoredProcedureRequest) -> str:
"""
Implement FQN context building for Stored Procedures.
We process the Stored Procedures lineage at the very end of the service. If we
just store the SP name, we lose the information of which db/schema the SP belongs to.
"""
return fqn.build(
metadata=self.metadata,
entity_type=StoredProcedure,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
procedure_name=right.name.__root__,
)
def create_patch_request(
self, original_entity: Entity, create_request: C
) -> PatchRequest:
@ -335,7 +251,7 @@ class TopologyRunnerMixin(Generic[C]):
"""
entity = None
entity_name = model_str(right.name)
entity_fqn = self.fqn_from_context(stage=stage, entity_name=entity_name)
entity_fqn = self.context.fqn_from_stage(stage=stage, entity_name=entity_name)
# If we don't want to write data in OM, we'll return what we fetch from the API.
# This will be applicable for service entities since we do not want to overwrite the data
@ -405,7 +321,7 @@ class TopologyRunnerMixin(Generic[C]):
"for the service connection."
)
self.update_context(stage=stage, right=right)
self.context.update_context_name(stage=stage, right=right)
@yield_and_update_context.register
def _(
@ -421,7 +337,7 @@ class TopologyRunnerMixin(Generic[C]):
lineage has been properly drawn. We'll skip the process for now.
"""
yield entity_request
self.update_context(stage=stage, right=right.edge.fromEntity.name.__root__)
self.context.update_context_name(stage=stage, right=right.edge.fromEntity)
@yield_and_update_context.register
def _(
@ -430,11 +346,16 @@ class TopologyRunnerMixin(Generic[C]):
stage: NodeStage,
entity_request: Either[C],
) -> Iterable[Either[Entity]]:
"""Tag implementation for the context information"""
"""
Tag implementation for the context information.
We need the full OMetaTagAndClassification in the context
to build the TagLabels during the ingestion. We need to bundle
both CreateClassificationRequest and CreateTagRequest.
"""
yield entity_request
# We'll keep the tag fqn in the context and use if required
self.update_context(stage=stage, right=right)
self.context.update_context_value(stage=stage, value=right)
@yield_and_update_context.register
def _(
@ -446,8 +367,7 @@ class TopologyRunnerMixin(Generic[C]):
"""Custom Property implementation for the context information"""
yield entity_request
# We'll keep the tag fqn in the context and use if required
self.update_context(stage=stage, right=right)
self.context.update_context_value(stage=stage, value=right)
def sink_request(
self, stage: NodeStage, entity_request: Either[C]

View File

@ -11,12 +11,20 @@
"""
Defines the topology for ingesting sources
"""
from functools import singledispatchmethod
from typing import Any, Generic, List, Optional, Type, TypeVar
from pydantic import BaseModel, Extra, Field, create_model
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn
T = TypeVar("T", bound=BaseModel)
C = TypeVar("C", bound=BaseModel)
class NodeStage(BaseModel, Generic[T]):
@ -130,6 +138,112 @@ class TopologyContext(BaseModel):
ctx = {key: value.name.__root__ for key, value in self.__dict__.items()}
return f"TopologyContext({ctx})"
@classmethod
def create(cls, topology: ServiceTopology) -> "TopologyContext":
"""
Dynamically build a context based on the topology nodes.
Builds a Pydantic BaseModel class.
:param topology: ServiceTopology
:return: TopologyContext
"""
nodes = get_topology_nodes(topology)
ctx_fields = {
stage.context: (Optional[stage.type_], None)
for node in nodes
for stage in node.stages
if stage.context
}
return create_model(
"GeneratedContext", **ctx_fields, __base__=TopologyContext
)()
def upsert(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
self.__dict__[key] = value
def append(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
if self.__dict__.get(key):
self.__dict__[key].append(value)
else:
self.__dict__[key] = [value]
def clear_stage(self, stage: NodeStage) -> None:
"""
Clear the available context
:param stage: Update stage context to the default values
"""
self.__dict__[stage.context] = None
def fqn_from_stage(self, stage: NodeStage, entity_name: str) -> str:
"""
Read the context
:param stage: Topology node being processed
:param entity_name: name being stored
:return: Entity FQN derived from context
"""
context_names = [
self.__dict__[dependency]
for dependency in stage.consumer or [] # root nodes do not have consumers
]
return fqn._build( # pylint: disable=protected-access
*context_names, entity_name
)
def update_context_name(self, stage: NodeStage, right: C) -> None:
"""
Append or update context
We'll store the entity name or FQN in the topology context.
If we store the name, the FQN will be built in the source itself when needed.
"""
if stage.store_fqn:
new_context = self._build_new_context_fqn(right)
else:
new_context = model_str(right.name)
self.update_context_value(stage=stage, value=new_context)
def update_context_value(self, stage: NodeStage, value: Any) -> None:
if stage.context and not stage.store_all_in_context:
self.upsert(key=stage.context, value=value)
if stage.context and stage.store_all_in_context:
self.append(key=stage.context, value=value)
@singledispatchmethod
def _build_new_context_fqn(self, right: C) -> str:
"""Build context fqn string"""
raise NotImplementedError(f"Missing implementation for [{type(C)}]")
@_build_new_context_fqn.register
def _(self, right: CreateStoredProcedureRequest) -> str:
"""
Implement FQN context building for Stored Procedures.
We process the Stored Procedures lineage at the very end of the service. If we
just store the SP name, we lose the information of which db/schema the SP belongs to.
"""
return fqn.build(
metadata=None,
entity_type=StoredProcedure,
service_name=self.__dict__["database_service"],
database_name=self.__dict__["database"],
schema_name=self.__dict__["database_schema"],
procedure_name=right.name.__root__,
)
def get_topology_nodes(topology: ServiceTopology) -> List[TopologyNode]:
"""
@ -163,34 +277,6 @@ def get_topology_root(topology: ServiceTopology) -> List[TopologyNode]:
return [node for node in nodes if node_has_no_consumers(node)]
def get_ctx_default(stage: NodeStage) -> Optional[List[Any]]:
"""
If we cache all, default value is an empty list
:param stage: Node Stage
:return: None or []
"""
return [] if stage.store_all_in_context else None
def create_source_context(topology: ServiceTopology) -> TopologyContext:
"""
Dynamically build a context based on the topology nodes.
Builds a Pydantic BaseModel class.
:param topology: ServiceTopology
:return: TopologyContext
"""
nodes = get_topology_nodes(topology)
ctx_fields = {
stage.context: (Optional[stage.type_], get_ctx_default(stage))
for node in nodes
for stage in node.stages
if stage.context
}
return create_model("GeneratedContext", **ctx_fields, __base__=TopologyContext)()
def get_topology_node(name: str, topology: ServiceTopology) -> TopologyNode:
"""
Fetch a topology node by name

View File

@ -54,8 +54,8 @@ from metadata.ingestion.models.patch_request import PatchRequest
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -194,7 +194,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: DashboardConnection.__fields__["config"].type_
topology = DashboardServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
dashboard_source_state: Set = set()
datamodel_source_state: Set = set()

View File

@ -127,7 +127,7 @@ class DomodashboardSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -114,7 +114,7 @@ class LightdashSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -657,7 +657,7 @@ class LookerSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
# Dashboards are created from the UI directly. They are not linked to a project
# like LookML assets, but rather just organised in folders.

View File

@ -152,7 +152,7 @@ class MetabaseSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -106,7 +106,7 @@ class ModeSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -113,7 +113,7 @@ class MstrSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -408,7 +408,7 @@ class PowerbiSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -129,7 +129,7 @@ class QliksenseSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -144,7 +144,7 @@ class QuicksightSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -157,7 +157,7 @@ class RedashSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
sourceUrl=self.get_dashboard_url(dashboard_details),

View File

@ -97,7 +97,7 @@ class SupersetAPISource(SupersetSourceMixin):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -121,7 +121,7 @@ class SupersetDBSource(SupersetSourceMixin):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),

View File

@ -213,7 +213,7 @@ class TableauSource(DashboardServiceSource):
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
dataModels=[
fqn.build(

View File

@ -62,8 +62,8 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.utils import fqn
@ -205,7 +205,7 @@ class DatabaseServiceSource(
inspector: Inspector
topology = DatabaseServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
def prepare(self):
"""By default, there is no preparation needed"""

View File

@ -31,8 +31,8 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
@ -135,7 +135,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
"""
topology = DbtServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
source_config: DbtPipeline
def remove_manifest_non_required_keys(self, manifest_dict: dict):

View File

@ -37,8 +37,8 @@ from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -114,7 +114,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: MessagingConnection.__fields__["config"].type_
topology = MessagingServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
topic_source_state: Set = set()
def __init__(

View File

@ -39,8 +39,8 @@ from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -100,7 +100,7 @@ class MlModelServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: MlModelConnection.__fields__["config"].type_
topology = MlModelServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
mlmodel_source_state: Set = set()
def __init__(

View File

@ -133,7 +133,7 @@ class DatabrickspipelineSource(PipelineServiceSource):
def get_tasks(self, pipeline_details: dict) -> List[Task]:
task_list = []
self._append_context(key="job_id_list", value=pipeline_details["job_id"])
self.context.append(key="job_id_list", value=pipeline_details["job_id"])
downstream_tasks = self.get_downstream_tasks(
pipeline_details["settings"].get("tasks")

View File

@ -37,8 +37,8 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -116,7 +116,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: PipelineConnection.__fields__["config"].type_
topology = PipelineServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
pipeline_source_state: Set = set()
def __init__(

View File

@ -43,8 +43,8 @@ from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -110,7 +110,7 @@ class SearchServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: SearchConnection.__fields__["config"].type_
topology = SearchServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
index_source_state: Set = set()
def __init__(

View File

@ -43,8 +43,8 @@ from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
@ -112,7 +112,7 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: StorageConnection.__fields__["config"].type_
topology = StorageServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
container_source_state: Set = set()
global_manifest: Optional[ManifestMetadataConfig]

View File

@ -91,7 +91,7 @@ EXAMPLE_TABLE = [
columns=[],
)
]
mock_tableau_config = {
mock_config = {
"source": {
"type": "metabase",
"serviceName": "mock_metabase",
@ -234,9 +234,9 @@ class MetabaseUnitTest(TestCase):
super().__init__(methodName)
get_connection.return_value = False
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_tableau_config)
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_config)
self.metabase = MetabaseSource.create(
mock_tableau_config["source"],
mock_config["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
self.metabase.client = SimpleNamespace()

View File

@ -0,0 +1,177 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Check context operations
"""
from unittest import TestCase
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.classification.tag import TagName
from metadata.generated.schema.entity.data.storedProcedure import (
Language,
StoredProcedure,
StoredProcedureCode,
)
from metadata.generated.schema.entity.data.table import (
Column,
ColumnName,
DataType,
Table,
)
from metadata.generated.schema.entity.type import EntityName
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Markdown
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.topology import NodeStage, TopologyContext
from metadata.ingestion.source.database.database_service import DatabaseServiceTopology
TABLE_STAGE = NodeStage(
type_=Table,
context="table",
processor="yield_table",
consumer=["database_service", "database", "database_schema"],
use_cache=True,
)
TAGS_STAGE = NodeStage(
type_=OMetaTagAndClassification,
context="tags",
processor="yield_table_tag_details",
nullable=True,
store_all_in_context=True,
)
PROCEDURES_STAGE = NodeStage(
type_=StoredProcedure,
context="stored_procedures",
processor="yield_stored_procedure",
consumer=["database_service", "database", "database_schema"],
store_all_in_context=True,
store_fqn=True,
use_cache=True,
)
class TopologyContextTest(TestCase):
"""Validate context ops"""
# Randomly picked up to test
db_service_topology = DatabaseServiceTopology()
def test_upsert(self):
"""We can add a new key and update its value"""
context = TopologyContext.create(self.db_service_topology)
context.upsert(key="new_key", value="something")
self.assertEqual(context.new_key, "something")
context.upsert(key="new_key", value="new")
self.assertEqual(context.new_key, "new")
def test_replace(self):
"""We can append results to a new key. If it does not exist, it'll instantiate a list"""
context = TopologyContext.create(self.db_service_topology)
context.append(key="new_key", value=1)
self.assertEqual(context.new_key, [1])
context.append(key="new_key", value=2)
self.assertEqual(context.new_key, [1, 2])
def test_clear(self):
"""We can clan up the context values"""
context = TopologyContext.create(self.db_service_topology)
my_stage = NodeStage(
type_=Table,
context="new_key",
processor="random",
)
context.append(key="new_key", value=1)
self.assertEqual(context.new_key, [1])
context.clear_stage(stage=my_stage)
self.assertIsNone(context.new_key)
def test_fqn_from_stage(self):
"""We build the right fqn at each stage"""
context = TopologyContext.create(self.db_service_topology)
context.upsert(key="database_service", value="service")
context.upsert(key="database", value="database")
context.upsert(key="database_schema", value="schema")
table_fqn = context.fqn_from_stage(stage=TABLE_STAGE, entity_name="table")
self.assertEqual(table_fqn, "service.database.schema.table")
def test_update_context_value(self):
"""We can update values directly"""
context = TopologyContext.create(self.db_service_topology)
classification_and_tag = OMetaTagAndClassification(
fqn=None,
classification_request=CreateClassificationRequest(
name=TagName(__root__="my_classification"),
description=Markdown(__root__="something"),
),
tag_request=CreateTagRequest(
name=TagName(__root__="my_tag"),
description=Markdown(__root__="something"),
),
)
context.update_context_value(stage=TAGS_STAGE, value=classification_and_tag)
self.assertEqual(context.tags, [classification_and_tag])
def test_update_context_name(self):
"""Check context updates for EntityName and FQN"""
context = TopologyContext.create(self.db_service_topology)
context.update_context_name(
stage=TABLE_STAGE,
right=CreateTableRequest(
name=EntityName(__root__="table"),
databaseSchema=FullyQualifiedEntityName(__root__="schema"),
columns=[
Column(name=ColumnName(__root__="id"), dataType=DataType.BIGINT)
],
),
)
self.assertEqual(context.table, "table")
context.upsert(key="database_service", value="service")
context.upsert(key="database", value="database")
context.upsert(key="database_schema", value="schema")
context.update_context_name(
stage=PROCEDURES_STAGE,
right=CreateStoredProcedureRequest(
name=EntityName(__root__="stored_proc"),
databaseSchema=FullyQualifiedEntityName(__root__="schema"),
storedProcedureCode=StoredProcedureCode(
language=Language.SQL,
code="SELECT * FROM AWESOME",
),
),
)
self.assertEqual(
context.stored_procedures, ["service.database.schema.stored_proc"]
)

View File

@ -23,8 +23,8 @@ from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.source_hash import generate_source_hash
@ -73,7 +73,7 @@ class MockTopology(ServiceTopology):
class MockSource(TopologyRunnerMixin):
topology = MockTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
@staticmethod
def get_schemas():