From 75dcbe612ae5ec4de9fb87db39f48b96f584c4e3 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 25 Aug 2022 08:41:22 +0200 Subject: [PATCH] Validate service sink ack (#6895) * Validate service sink ack * naming --- .../metadata/ingestion/api/topology_runner.py | 17 +++++++++++++++++ .../src/metadata/ingestion/models/topology.py | 1 + .../source/dashboard/dashboard_service.py | 1 + .../source/database/database_service.py | 1 + .../source/messaging/messaging_service.py | 1 + .../ingestion/source/mlmodel/mlmodel_service.py | 1 + .../source/pipeline/pipeline_service.py | 1 + 7 files changed, 23 insertions(+) diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 3850bcb679e..5d076ae8f65 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -35,6 +35,13 @@ logger = ingestion_logger() C = TypeVar("C", bound=BaseModel) +class MissingExpectedEntityAckException(Exception): + """ + After running the ack to the sink, we got no + Entity back + """ + + class TopologyRunnerMixin(Generic[C]): """ Prepares the next_record function @@ -185,6 +192,16 @@ class TopologyRunnerMixin(Generic[C]): fields=["*"], # Get all the available data from the Entity ) tries -= 1 + + # We have ack the sink waiting for a response, but got nothing back + if stage.must_return and entity is None: + # Safe access to Entity Request name + raise MissingExpectedEntityAck( + f"Missing ack back from [{stage.type_.__name__}: {getattr(entity_request, 'name')}] - " + "Possible causes are changes in the server Fernet key or mismatched JSON Schemas " + "for the service connection." + ) + else: yield entity diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index da9c1c7cbe5..bc74edaa52d 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -35,6 +35,7 @@ class NodeStage(BaseModel, Generic[T]): context: Optional[str] = None # context key storing stage state, if needed ack_sink: bool = True # Validate that the request is present in OM and update the context with the results nullable: bool = False # The yielded value can be null + must_return: bool = False # The sink MUST return a value back after ack. Useful to validate services are correct. cache_all: bool = ( False # If we need to cache all values being yielded in the context ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index f3bebf8b7bc..1e0cf1e4fa7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -80,6 +80,7 @@ class DashboardServiceTopology(ServiceTopology): context="dashboard_service", processor="yield_create_request_dashboard_service", overwrite=False, + must_return=True, ), NodeStage( type_=OMetaTagAndCategory, diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 01ffd3edf68..1a2141533b7 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -111,6 +111,7 @@ class DatabaseServiceTopology(ServiceTopology): context="database_service", processor="yield_create_request_database_service", overwrite=False, + must_return=True, ), NodeStage( type_=StorageService, diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index fe232424814..208b5d723dc 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -60,6 +60,7 @@ class MessagingServiceTopology(ServiceTopology): context="messaging_service", processor="yield_create_request_messaging_service", overwrite=False, + must_return=True, ) ], children=["topic"], diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 85f034659fd..aa4f78124f3 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -66,6 +66,7 @@ class MlModelServiceTopology(ServiceTopology): context="mlmodel_service", processor="yield_create_request_mlmodel_service", overwrite=False, + must_return=True, ), ], children=["mlmodel"], diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 9ee051acc73..c12911a00e2 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -63,6 +63,7 @@ class PipelineServiceTopology(ServiceTopology): context="pipeline_service", processor="yield_create_request_pipeline_service", overwrite=False, + must_return=True, ), ], children=["pipeline"],