diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 7b24930e032..c053170665e 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -80,6 +80,11 @@ class TopologyRunnerMixin(Generic[C]): except ValueError: logger.error("Value unexpectedly None") + # processing for all stages completed now cleaning the cache if applicable + for stage in node.stages: + if stage.clear_cache: + self.clear_context(stage=stage) + # process all children from the node being run yield from self.process_nodes(child_nodes) @@ -114,6 +119,13 @@ class TopologyRunnerMixin(Generic[C]): """ self.context.__dict__[key].append(value) + def clear_context(self, stage: NodeStage) -> None: + """ + Clear the available context + :param key: element to update from the source context + """ + self.context.__dict__[stage.context] = get_ctx_default(stage) + def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str: """ Read the context diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index cd889ed5b2c..95eac5b2d0f 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -38,6 +38,7 @@ class NodeStage(BaseModel, Generic[T]): cache_all: bool = ( False # If we need to cache all values being yielded in the context ) + clear_cache: bool = False # If we need to clean cache values in the context for each produced element consumer: Optional[ List[str] ] = None # keys in the source context to fetch state from the parent's context diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 49f98f5d66c..6a1039689eb 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -88,6 +88,7 @@ class DashboardServiceTopology(ServiceTopology): consumer=["dashboard_service"], nullable=True, cache_all=True, + clear_cache=True, ), NodeStage( type_=CreateUserRequest,