Validate service sink ack (#6895)

* Validate service sink ack

* naming
This commit is contained in:
Pere Miquel Brull 2022-08-25 08:41:22 +02:00 committed by GitHub
parent 04bcc99098
commit 75dcbe612a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 23 additions and 0 deletions

View File

@ -35,6 +35,13 @@ logger = ingestion_logger()
C = TypeVar("C", bound=BaseModel) C = TypeVar("C", bound=BaseModel)
class MissingExpectedEntityAckException(Exception):
"""
After running the ack to the sink, we got no
Entity back
"""
class TopologyRunnerMixin(Generic[C]): class TopologyRunnerMixin(Generic[C]):
""" """
Prepares the next_record function Prepares the next_record function
@ -185,6 +192,16 @@ class TopologyRunnerMixin(Generic[C]):
fields=["*"], # Get all the available data from the Entity fields=["*"], # Get all the available data from the Entity
) )
tries -= 1 tries -= 1
# We have ack the sink waiting for a response, but got nothing back
if stage.must_return and entity is None:
# Safe access to Entity Request name
raise MissingExpectedEntityAck(
f"Missing ack back from [{stage.type_.__name__}: {getattr(entity_request, 'name')}] - "
"Possible causes are changes in the server Fernet key or mismatched JSON Schemas "
"for the service connection."
)
else: else:
yield entity yield entity

View File

@ -35,6 +35,7 @@ class NodeStage(BaseModel, Generic[T]):
context: Optional[str] = None # context key storing stage state, if needed context: Optional[str] = None # context key storing stage state, if needed
ack_sink: bool = True # Validate that the request is present in OM and update the context with the results ack_sink: bool = True # Validate that the request is present in OM and update the context with the results
nullable: bool = False # The yielded value can be null nullable: bool = False # The yielded value can be null
must_return: bool = False # The sink MUST return a value back after ack. Useful to validate services are correct.
cache_all: bool = ( cache_all: bool = (
False # If we need to cache all values being yielded in the context False # If we need to cache all values being yielded in the context
) )

View File

@ -80,6 +80,7 @@ class DashboardServiceTopology(ServiceTopology):
context="dashboard_service", context="dashboard_service",
processor="yield_create_request_dashboard_service", processor="yield_create_request_dashboard_service",
overwrite=False, overwrite=False,
must_return=True,
), ),
NodeStage( NodeStage(
type_=OMetaTagAndCategory, type_=OMetaTagAndCategory,

View File

@ -111,6 +111,7 @@ class DatabaseServiceTopology(ServiceTopology):
context="database_service", context="database_service",
processor="yield_create_request_database_service", processor="yield_create_request_database_service",
overwrite=False, overwrite=False,
must_return=True,
), ),
NodeStage( NodeStage(
type_=StorageService, type_=StorageService,

View File

@ -60,6 +60,7 @@ class MessagingServiceTopology(ServiceTopology):
context="messaging_service", context="messaging_service",
processor="yield_create_request_messaging_service", processor="yield_create_request_messaging_service",
overwrite=False, overwrite=False,
must_return=True,
) )
], ],
children=["topic"], children=["topic"],

View File

@ -66,6 +66,7 @@ class MlModelServiceTopology(ServiceTopology):
context="mlmodel_service", context="mlmodel_service",
processor="yield_create_request_mlmodel_service", processor="yield_create_request_mlmodel_service",
overwrite=False, overwrite=False,
must_return=True,
), ),
], ],
children=["mlmodel"], children=["mlmodel"],

View File

@ -63,6 +63,7 @@ class PipelineServiceTopology(ServiceTopology):
context="pipeline_service", context="pipeline_service",
processor="yield_create_request_pipeline_service", processor="yield_create_request_pipeline_service",
overwrite=False, overwrite=False,
must_return=True,
), ),
], ],
children=["pipeline"], children=["pipeline"],