diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index e643becd991..adf83308a7b 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -19,9 +19,6 @@ from typing import Any, Generic, Iterable, List, Type, TypeVar from pydantic import BaseModel -from metadata.generated.schema.api.data.createStoredProcedure import ( - CreateStoredProcedureRequest, -) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -35,13 +32,11 @@ from metadata.ingestion.models.topology import ( ServiceTopology, TopologyContext, TopologyNode, - get_ctx_default, get_topology_node, get_topology_root, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.utils import model_str -from metadata.utils import fqn from metadata.utils.logger import ingestion_logger from metadata.utils.source_hash import generate_source_hash @@ -112,7 +107,7 @@ class TopologyRunnerMixin(Generic[C]): # Once we are done processing all the stages, for stage in node.stages: if stage.clear_context: - self.clear_context(stage=stage) + self.context.clear_stage(stage=stage) # process all children from the node being run yield from self.process_nodes(child_nodes) @@ -182,7 +177,7 @@ class TopologyRunnerMixin(Generic[C]): for child_node in child_nodes or []: for child_stage in child_node.stages or []: if child_stage.use_cache: - entity_fqn = self.fqn_from_context( + entity_fqn = self.context.fqn_from_stage( stage=stage, entity_name=self.context.__dict__[stage.context], ) @@ -229,85 +224,6 @@ class TopologyRunnerMixin(Generic[C]): """ yield from self.process_nodes(get_topology_root(self.topology)) - def _replace_context(self, key: str, value: Any) -> None: - """ - Update the key of the context with the given value - :param key: element to update from the source context - :param value: value to use for the update - """ - self.context.__dict__[key] = value - - def _append_context(self, key: str, value: Any) -> None: - """ - Update the key of the context with the given value - :param key: element to update from the source context - :param value: value to use for the update - """ - self.context.__dict__[key].append(value) - - def clear_context(self, stage: NodeStage) -> None: - """ - Clear the available context - :param stage: Update stage context to the default values - """ - self.context.__dict__[stage.context] = get_ctx_default(stage) - - def fqn_from_context(self, stage: NodeStage, entity_name: str) -> str: - """ - Read the context - :param stage: Topology node being processed - :param entity_name: name being stored - :return: Entity FQN derived from context - """ - context_names = [ - self.context.__dict__[dependency] - for dependency in stage.consumer or [] # root nodes do not have consumers - ] - return fqn._build( # pylint: disable=protected-access - *context_names, entity_name - ) - - def update_context(self, stage: NodeStage, right: C): - """ - Append or update context - - We'll store the entity name or FQN in the topology context. - If we store the name, the FQN will be built in the source itself when needed. - """ - - if stage.store_fqn: - new_context = self._build_new_context_fqn(right) - else: - new_context = model_str(right.name) - - if stage.context and not stage.store_all_in_context: - self._replace_context(key=stage.context, value=new_context) - if stage.context and stage.store_all_in_context: - self._append_context(key=stage.context, value=new_context) - - @singledispatchmethod - def _build_new_context_fqn(self, right: C) -> str: - """Build context fqn string""" - raise NotImplementedError(f"Missing implementation for [{type(C)}]") - - @_build_new_context_fqn.register - def _(self, right: CreateStoredProcedureRequest) -> str: - """ - Implement FQN context building for Stored Procedures. - - We process the Stored Procedures lineage at the very end of the service. If we - just store the SP name, we lose the information of which db/schema the SP belongs to. - """ - - return fqn.build( - metadata=self.metadata, - entity_type=StoredProcedure, - service_name=self.context.database_service, - database_name=self.context.database, - schema_name=self.context.database_schema, - procedure_name=right.name.__root__, - ) - def create_patch_request( self, original_entity: Entity, create_request: C ) -> PatchRequest: @@ -335,7 +251,7 @@ class TopologyRunnerMixin(Generic[C]): """ entity = None entity_name = model_str(right.name) - entity_fqn = self.fqn_from_context(stage=stage, entity_name=entity_name) + entity_fqn = self.context.fqn_from_stage(stage=stage, entity_name=entity_name) # If we don't want to write data in OM, we'll return what we fetch from the API. # This will be applicable for service entities since we do not want to overwrite the data @@ -405,7 +321,7 @@ class TopologyRunnerMixin(Generic[C]): "for the service connection." ) - self.update_context(stage=stage, right=right) + self.context.update_context_name(stage=stage, right=right) @yield_and_update_context.register def _( @@ -421,7 +337,7 @@ class TopologyRunnerMixin(Generic[C]): lineage has been properly drawn. We'll skip the process for now. """ yield entity_request - self.update_context(stage=stage, right=right.edge.fromEntity.name.__root__) + self.context.update_context_name(stage=stage, right=right.edge.fromEntity) @yield_and_update_context.register def _( @@ -430,11 +346,16 @@ class TopologyRunnerMixin(Generic[C]): stage: NodeStage, entity_request: Either[C], ) -> Iterable[Either[Entity]]: - """Tag implementation for the context information""" + """ + Tag implementation for the context information. + + We need the full OMetaTagAndClassification in the context + to build the TagLabels during the ingestion. We need to bundle + both CreateClassificationRequest and CreateTagRequest. + """ yield entity_request - # We'll keep the tag fqn in the context and use if required - self.update_context(stage=stage, right=right) + self.context.update_context_value(stage=stage, value=right) @yield_and_update_context.register def _( @@ -446,8 +367,7 @@ class TopologyRunnerMixin(Generic[C]): """Custom Property implementation for the context information""" yield entity_request - # We'll keep the tag fqn in the context and use if required - self.update_context(stage=stage, right=right) + self.context.update_context_value(stage=stage, value=right) def sink_request( self, stage: NodeStage, entity_request: Either[C] diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index 18aef03f3ae..3b4d04afe9a 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -11,12 +11,20 @@ """ Defines the topology for ingesting sources """ - +from functools import singledispatchmethod from typing import Any, Generic, List, Optional, Type, TypeVar from pydantic import BaseModel, Extra, Field, create_model +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure +from metadata.ingestion.ometa.utils import model_str +from metadata.utils import fqn + T = TypeVar("T", bound=BaseModel) +C = TypeVar("C", bound=BaseModel) class NodeStage(BaseModel, Generic[T]): @@ -130,6 +138,112 @@ class TopologyContext(BaseModel): ctx = {key: value.name.__root__ for key, value in self.__dict__.items()} return f"TopologyContext({ctx})" + @classmethod + def create(cls, topology: ServiceTopology) -> "TopologyContext": + """ + Dynamically build a context based on the topology nodes. + + Builds a Pydantic BaseModel class. + + :param topology: ServiceTopology + :return: TopologyContext + """ + nodes = get_topology_nodes(topology) + ctx_fields = { + stage.context: (Optional[stage.type_], None) + for node in nodes + for stage in node.stages + if stage.context + } + return create_model( + "GeneratedContext", **ctx_fields, __base__=TopologyContext + )() + + def upsert(self, key: str, value: Any) -> None: + """ + Update the key of the context with the given value + :param key: element to update from the source context + :param value: value to use for the update + """ + self.__dict__[key] = value + + def append(self, key: str, value: Any) -> None: + """ + Update the key of the context with the given value + :param key: element to update from the source context + :param value: value to use for the update + """ + if self.__dict__.get(key): + self.__dict__[key].append(value) + else: + self.__dict__[key] = [value] + + def clear_stage(self, stage: NodeStage) -> None: + """ + Clear the available context + :param stage: Update stage context to the default values + """ + self.__dict__[stage.context] = None + + def fqn_from_stage(self, stage: NodeStage, entity_name: str) -> str: + """ + Read the context + :param stage: Topology node being processed + :param entity_name: name being stored + :return: Entity FQN derived from context + """ + context_names = [ + self.__dict__[dependency] + for dependency in stage.consumer or [] # root nodes do not have consumers + ] + return fqn._build( # pylint: disable=protected-access + *context_names, entity_name + ) + + def update_context_name(self, stage: NodeStage, right: C) -> None: + """ + Append or update context + + We'll store the entity name or FQN in the topology context. + If we store the name, the FQN will be built in the source itself when needed. + """ + + if stage.store_fqn: + new_context = self._build_new_context_fqn(right) + else: + new_context = model_str(right.name) + + self.update_context_value(stage=stage, value=new_context) + + def update_context_value(self, stage: NodeStage, value: Any) -> None: + if stage.context and not stage.store_all_in_context: + self.upsert(key=stage.context, value=value) + if stage.context and stage.store_all_in_context: + self.append(key=stage.context, value=value) + + @singledispatchmethod + def _build_new_context_fqn(self, right: C) -> str: + """Build context fqn string""" + raise NotImplementedError(f"Missing implementation for [{type(C)}]") + + @_build_new_context_fqn.register + def _(self, right: CreateStoredProcedureRequest) -> str: + """ + Implement FQN context building for Stored Procedures. + + We process the Stored Procedures lineage at the very end of the service. If we + just store the SP name, we lose the information of which db/schema the SP belongs to. + """ + + return fqn.build( + metadata=None, + entity_type=StoredProcedure, + service_name=self.__dict__["database_service"], + database_name=self.__dict__["database"], + schema_name=self.__dict__["database_schema"], + procedure_name=right.name.__root__, + ) + def get_topology_nodes(topology: ServiceTopology) -> List[TopologyNode]: """ @@ -163,34 +277,6 @@ def get_topology_root(topology: ServiceTopology) -> List[TopologyNode]: return [node for node in nodes if node_has_no_consumers(node)] -def get_ctx_default(stage: NodeStage) -> Optional[List[Any]]: - """ - If we cache all, default value is an empty list - :param stage: Node Stage - :return: None or [] - """ - return [] if stage.store_all_in_context else None - - -def create_source_context(topology: ServiceTopology) -> TopologyContext: - """ - Dynamically build a context based on the topology nodes. - - Builds a Pydantic BaseModel class. - - :param topology: ServiceTopology - :return: TopologyContext - """ - nodes = get_topology_nodes(topology) - ctx_fields = { - stage.context: (Optional[stage.type_], get_ctx_default(stage)) - for node in nodes - for stage in node.stages - if stage.context - } - return create_model("GeneratedContext", **ctx_fields, __base__=TopologyContext)() - - def get_topology_node(name: str, topology: ServiceTopology) -> TopologyNode: """ Fetch a topology node by name diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 42398353ca4..9c1d76cacdf 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -54,8 +54,8 @@ from metadata.ingestion.models.patch_request import PatchRequest from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -194,7 +194,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): service_connection: DashboardConnection.__fields__["config"].type_ topology = DashboardServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) dashboard_source_state: Set = set() datamodel_source_state: Set = set() diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py index 1624c35de63..576d37e6728 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard/metadata.py @@ -127,7 +127,7 @@ class DomodashboardSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py index ae5b615ec34..76f2fdf506c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/lightdash/metadata.py @@ -114,7 +114,7 @@ class LightdashSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index cfd6dd2aeaa..e40222762a1 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -657,7 +657,7 @@ class LookerSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], # Dashboards are created from the UI directly. They are not linked to a project # like LookML assets, but rather just organised in folders. diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py index 14ad2e9efe9..7917f8404f1 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py @@ -152,7 +152,7 @@ class MetabaseSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py index 50116a018ed..ceeb2288b96 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mode/metadata.py @@ -106,7 +106,7 @@ class ModeSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py index ab514f96c84..79b3f6bba77 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py @@ -113,7 +113,7 @@ class MstrSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 2857c82738b..3618152873e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -408,7 +408,7 @@ class PowerbiSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py index 4f3f98bc264..0ca831a7598 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py @@ -129,7 +129,7 @@ class QliksenseSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py index f5f36940b3f..cafb4b0b43c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py @@ -144,7 +144,7 @@ class QuicksightSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py index eea809349b1..d83bdad7278 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py @@ -157,7 +157,7 @@ class RedashSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, sourceUrl=self.get_dashboard_url(dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py index 281458d6921..e7d2f79e6a4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py @@ -97,7 +97,7 @@ class SupersetAPISource(SupersetSourceMixin): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py index 01e76742de9..99977dbd24d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/db_source.py @@ -121,7 +121,7 @@ class SupersetDBSource(SupersetSourceMixin): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], service=self.context.dashboard_service, owner=self.get_owner_ref(dashboard_details=dashboard_details), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index 23f92c7ca3d..ab4d6677d9c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -213,7 +213,7 @@ class TableauSource(DashboardServiceSource): service_name=self.context.dashboard_service, chart_name=chart, ) - for chart in self.context.charts + for chart in self.context.charts or [] ], dataModels=[ fqn.build( diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index b3dd564a5c5..79442a4647e 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -62,8 +62,8 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.source.connections import get_test_connection_fn from metadata.utils import fqn @@ -205,7 +205,7 @@ class DatabaseServiceSource( inspector: Inspector topology = DatabaseServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) def prepare(self): """By default, there is no preparation needed""" diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index 64a54fe4963..b6270fc5d46 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -31,8 +31,8 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.source.database.database_service import DataModelLink from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details @@ -135,7 +135,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC): """ topology = DbtServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) source_config: DbtPipeline def remove_manifest_non_required_keys(self, manifest_dict: dict): diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index bf021776b2e..f377b8e0b70 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -37,8 +37,8 @@ from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -114,7 +114,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC): service_connection: MessagingConnection.__fields__["config"].type_ topology = MessagingServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) topic_source_state: Set = set() def __init__( diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 1ca9766868f..f8f1486da0c 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -39,8 +39,8 @@ from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -100,7 +100,7 @@ class MlModelServiceSource(TopologyRunnerMixin, Source, ABC): service_connection: MlModelConnection.__fields__["config"].type_ topology = MlModelServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) mlmodel_source_state: Set = set() def __init__( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py index bda3964b32a..d7d62d3316d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py @@ -133,7 +133,7 @@ class DatabrickspipelineSource(PipelineServiceSource): def get_tasks(self, pipeline_details: dict) -> List[Task]: task_list = [] - self._append_context(key="job_id_list", value=pipeline_details["job_id"]) + self.context.append(key="job_id_list", value=pipeline_details["job_id"]) downstream_tasks = self.get_downstream_tasks( pipeline_details["settings"].get("tasks") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 402401e2116..ab79062864a 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -37,8 +37,8 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -116,7 +116,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): service_connection: PipelineConnection.__fields__["config"].type_ topology = PipelineServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) pipeline_source_state: Set = set() def __init__( diff --git a/ingestion/src/metadata/ingestion/source/search/search_service.py b/ingestion/src/metadata/ingestion/source/search/search_service.py index 7f592475841..2a6ac240ace 100644 --- a/ingestion/src/metadata/ingestion/source/search/search_service.py +++ b/ingestion/src/metadata/ingestion/source/search/search_service.py @@ -43,8 +43,8 @@ from metadata.ingestion.models.search_index_data import OMetaIndexSampleData from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -110,7 +110,7 @@ class SearchServiceSource(TopologyRunnerMixin, Source, ABC): service_connection: SearchConnection.__fields__["config"].type_ topology = SearchServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) index_source_state: Set = set() def __init__( diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index b76a8da3398..ee3a1910bdc 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -43,8 +43,8 @@ from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn @@ -112,7 +112,7 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC): service_connection: StorageConnection.__fields__["config"].type_ topology = StorageServiceTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) container_source_state: Set = set() global_manifest: Optional[ManifestMetadataConfig] diff --git a/ingestion/tests/unit/topology/dashboard/test_metabase.py b/ingestion/tests/unit/topology/dashboard/test_metabase.py index 51d8e87364f..5e5160d5536 100644 --- a/ingestion/tests/unit/topology/dashboard/test_metabase.py +++ b/ingestion/tests/unit/topology/dashboard/test_metabase.py @@ -91,7 +91,7 @@ EXAMPLE_TABLE = [ columns=[], ) ] -mock_tableau_config = { +mock_config = { "source": { "type": "metabase", "serviceName": "mock_metabase", @@ -234,9 +234,9 @@ class MetabaseUnitTest(TestCase): super().__init__(methodName) get_connection.return_value = False test_connection.return_value = False - self.config = OpenMetadataWorkflowConfig.parse_obj(mock_tableau_config) + self.config = OpenMetadataWorkflowConfig.parse_obj(mock_config) self.metabase = MetabaseSource.create( - mock_tableau_config["source"], + mock_config["source"], OpenMetadata(self.config.workflowConfig.openMetadataServerConfig), ) self.metabase.client = SimpleNamespace() diff --git a/ingestion/tests/unit/topology/test_context.py b/ingestion/tests/unit/topology/test_context.py new file mode 100644 index 00000000000..339ae426707 --- /dev/null +++ b/ingestion/tests/unit/topology/test_context.py @@ -0,0 +1,177 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Check context operations +""" +from unittest import TestCase + +from metadata.generated.schema.api.classification.createClassification import ( + CreateClassificationRequest, +) +from metadata.generated.schema.api.classification.createTag import CreateTagRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.entity.classification.tag import TagName +from metadata.generated.schema.entity.data.storedProcedure import ( + Language, + StoredProcedure, + StoredProcedureCode, +) +from metadata.generated.schema.entity.data.table import ( + Column, + ColumnName, + DataType, + Table, +) +from metadata.generated.schema.entity.type import EntityName +from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Markdown +from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.models.topology import NodeStage, TopologyContext +from metadata.ingestion.source.database.database_service import DatabaseServiceTopology + +TABLE_STAGE = NodeStage( + type_=Table, + context="table", + processor="yield_table", + consumer=["database_service", "database", "database_schema"], + use_cache=True, +) + +TAGS_STAGE = NodeStage( + type_=OMetaTagAndClassification, + context="tags", + processor="yield_table_tag_details", + nullable=True, + store_all_in_context=True, +) + +PROCEDURES_STAGE = NodeStage( + type_=StoredProcedure, + context="stored_procedures", + processor="yield_stored_procedure", + consumer=["database_service", "database", "database_schema"], + store_all_in_context=True, + store_fqn=True, + use_cache=True, +) + + +class TopologyContextTest(TestCase): + """Validate context ops""" + + # Randomly picked up to test + db_service_topology = DatabaseServiceTopology() + + def test_upsert(self): + """We can add a new key and update its value""" + context = TopologyContext.create(self.db_service_topology) + + context.upsert(key="new_key", value="something") + self.assertEqual(context.new_key, "something") + + context.upsert(key="new_key", value="new") + self.assertEqual(context.new_key, "new") + + def test_replace(self): + """We can append results to a new key. If it does not exist, it'll instantiate a list""" + context = TopologyContext.create(self.db_service_topology) + + context.append(key="new_key", value=1) + self.assertEqual(context.new_key, [1]) + + context.append(key="new_key", value=2) + self.assertEqual(context.new_key, [1, 2]) + + def test_clear(self): + """We can clan up the context values""" + context = TopologyContext.create(self.db_service_topology) + + my_stage = NodeStage( + type_=Table, + context="new_key", + processor="random", + ) + + context.append(key="new_key", value=1) + self.assertEqual(context.new_key, [1]) + + context.clear_stage(stage=my_stage) + self.assertIsNone(context.new_key) + + def test_fqn_from_stage(self): + """We build the right fqn at each stage""" + context = TopologyContext.create(self.db_service_topology) + + context.upsert(key="database_service", value="service") + context.upsert(key="database", value="database") + context.upsert(key="database_schema", value="schema") + + table_fqn = context.fqn_from_stage(stage=TABLE_STAGE, entity_name="table") + self.assertEqual(table_fqn, "service.database.schema.table") + + def test_update_context_value(self): + """We can update values directly""" + context = TopologyContext.create(self.db_service_topology) + + classification_and_tag = OMetaTagAndClassification( + fqn=None, + classification_request=CreateClassificationRequest( + name=TagName(__root__="my_classification"), + description=Markdown(__root__="something"), + ), + tag_request=CreateTagRequest( + name=TagName(__root__="my_tag"), + description=Markdown(__root__="something"), + ), + ) + + context.update_context_value(stage=TAGS_STAGE, value=classification_and_tag) + + self.assertEqual(context.tags, [classification_and_tag]) + + def test_update_context_name(self): + """Check context updates for EntityName and FQN""" + context = TopologyContext.create(self.db_service_topology) + + context.update_context_name( + stage=TABLE_STAGE, + right=CreateTableRequest( + name=EntityName(__root__="table"), + databaseSchema=FullyQualifiedEntityName(__root__="schema"), + columns=[ + Column(name=ColumnName(__root__="id"), dataType=DataType.BIGINT) + ], + ), + ) + + self.assertEqual(context.table, "table") + + context.upsert(key="database_service", value="service") + context.upsert(key="database", value="database") + context.upsert(key="database_schema", value="schema") + context.update_context_name( + stage=PROCEDURES_STAGE, + right=CreateStoredProcedureRequest( + name=EntityName(__root__="stored_proc"), + databaseSchema=FullyQualifiedEntityName(__root__="schema"), + storedProcedureCode=StoredProcedureCode( + language=Language.SQL, + code="SELECT * FROM AWESOME", + ), + ), + ) + + self.assertEqual( + context.stored_procedures, ["service.database.schema.stored_proc"] + ) diff --git a/ingestion/tests/unit/topology/test_runner.py b/ingestion/tests/unit/topology/test_runner.py index 02f85de0452..974b905574a 100644 --- a/ingestion/tests/unit/topology/test_runner.py +++ b/ingestion/tests/unit/topology/test_runner.py @@ -23,8 +23,8 @@ from metadata.ingestion.api.topology_runner import TopologyRunnerMixin from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, + TopologyContext, TopologyNode, - create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.source_hash import generate_source_hash @@ -73,7 +73,7 @@ class MockTopology(ServiceTopology): class MockSource(TopologyRunnerMixin): topology = MockTopology() - context = create_source_context(topology) + context = TopologyContext.create(topology) @staticmethod def get_schemas():