diff --git a/ingestion/src/metadata/ingestion/connections/test_connections.py b/ingestion/src/metadata/ingestion/connections/test_connections.py index b2368a6116b..30c39e7e5e8 100644 --- a/ingestion/src/metadata/ingestion/connections/test_connections.py +++ b/ingestion/src/metadata/ingestion/connections/test_connections.py @@ -105,7 +105,7 @@ def _test_connection_steps( def _test_connection_steps_automation_workflow( metadata: OpenMetadata, steps: List[TestConnectionStep], - automation_workflow: Optional[AutomationWorkflow], + automation_workflow: AutomationWorkflow, ) -> None: """ Run the test connection as part of the automation workflow @@ -137,15 +137,9 @@ def _test_connection_steps_automation_workflow( ) test_connection_result.lastUpdatedAt = datetime.now().timestamp() - updated_workflow = CreateWorkflowRequest( - name=automation_workflow.name, - description=automation_workflow.description, - workflowType=automation_workflow.workflowType, - request=automation_workflow.request, - response=test_connection_result, - status=WorkflowStatus.Running, + metadata.patch_automation_workflow_response( + automation_workflow, test_connection_result, WorkflowStatus.Running ) - metadata.create_or_update(updated_workflow) test_connection_result.lastUpdatedAt = datetime.now().timestamp() @@ -155,15 +149,8 @@ def _test_connection_steps_automation_workflow( else StatusType.Successful ) - metadata.create_or_update( - CreateWorkflowRequest( - name=automation_workflow.name, - description=automation_workflow.description, - workflowType=automation_workflow.workflowType, - request=automation_workflow.request, - response=test_connection_result, - status=WorkflowStatus.Successful, - ) + metadata.patch_automation_workflow_response( + automation_workflow, test_connection_result, WorkflowStatus.Successful ) except Exception as err: diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index bda64a7281f..9faceb7bd67 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -19,7 +19,14 @@ from typing import Dict, List, Optional, Type, TypeVar, Union from pydantic import BaseModel +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.automations.workflow import WorkflowStatus from metadata.generated.schema.entity.data.table import Table, TableConstraint +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) from metadata.generated.schema.type import basic from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource @@ -462,3 +469,40 @@ class OMetaPatchMixin(OMetaPatchMixinBase): ) return None + + def patch_automation_workflow_response( + self, + automation_workflow: AutomationWorkflow, + test_connection_result: TestConnectionResult, + workflow_status: WorkflowStatus, + ) -> None: + """ + Given an AutomationWorkflow, JSON PATCH the status and response. + """ + result_data: Dict = { + PatchField.PATH: PatchPath.RESPONSE, + PatchField.VALUE: test_connection_result.dict(), + PatchField.OPERATION: PatchOperation.ADD, + } + + # for deserializing into json convert enum object to string + result_data[PatchField.VALUE]["status"] = result_data[PatchField.VALUE][ + "status" + ].value + + status_data: Dict = { + PatchField.PATH: PatchPath.STATUS, + PatchField.OPERATION: PatchOperation.ADD, + PatchField.VALUE: workflow_status.value, + } + + try: + self.client.patch( + path=f"{self.get_suffix(AutomationWorkflow)}/{model_str(automation_workflow.id)}", + data=json.dumps([result_data, status_data]), + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error trying to PATCH status for automation workflow [{model_str(automation_workflow)}]: {exc}" + ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py index 3c521f98c8e..5c944a36a54 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py @@ -80,6 +80,8 @@ class PatchPath(str, Enum): SYNONYMS = "/synonyms/{index}" TABLE_CONSTRAINTS = "/tableConstraints" TAGS = "/tags/{tag_index}" + RESPONSE = "/response" + STATUS = "/status" class PatchValue(str, Enum): diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowRepository.java index 0f1ac91999a..cf03ff0860c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowRepository.java @@ -15,7 +15,7 @@ import org.openmetadata.service.util.EntityUtil; public class WorkflowRepository extends EntityRepository { private static final String UPDATE_FIELDS = "owner"; - private static final String PATCH_FIELDS = "owner"; + private static final String PATCH_FIELDS = "owner,status,response"; public WorkflowRepository(CollectionDAO dao) { super(