From fe2caf7a5d0503a0040a0af6c7fdaee7290e5e85 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Mon, 14 Jul 2025 12:33:17 +0530 Subject: [PATCH] MINOR: Enhance patch request handling by adding 'skip_on_failure' parameter (#22142) * Enhance patch request handling by adding 'skip_on_failure' parameter * Introduced 'skip_on_failure' option in build_patch and OMetaPatchMixin methods to control behavior on patch operation failures. * Updated documentation to reflect the new parameter and its default value. * Improved error handling to log warnings instead of raising exceptions when 'skip_on_failure' is set to True. * fix: add tests for patch request with skip on failure * refactor: streamline mock patching and improve test readability in patch request tests * Consolidated import statements for unittest mock. * Enhanced readability by reducing line breaks and simplifying mock patching syntax. * Ensured consistent use of commas in function calls for clarity. * Updated tests to maintain functionality while improving code style. * fix: improve error handling in patch operations * Enhanced logging for patch operation failures in both build_patch and OMetaPatchMixin methods. * Added detailed entity information in warning and error messages to aid in debugging. * Ensured consistent behavior when 'skip_on_failure' is set, providing clearer feedback on operation outcomes. * fix: clean up whitespace in patch request error handling * Removed unnecessary whitespace in the build_patch function to improve code readability. * Ensured consistent formatting in warning and error messages for better clarity during logging. * fix: enhance error handling and improve test assertions in patch request * Updated the condition for checking 'changeDescription' in the _remove_change_description function for better clarity. * Modified exception handling in tests to raise RuntimeError instead of a generic Exception, providing more specific error feedback. * Improved assertions in tests to check for the presence of error messages, enhancing the robustness of error handling verification. * Adjusted test cases to reflect changes in expected patch operation counts and ensure accurate validation of patch operations. * fix: enhance patch operation with skip_on_failure handling * Added 'skip_on_failure' parameter to OMetaPatchMixin methods to control behavior on patch failures. * Improved error handling to log warnings and provide detailed feedback when patch operations are skipped. * Updated tests to verify the new behavior of skipping failures and improved assertions for clarity. --- .../ingestion/models/patch_request.py | 40 ++++- .../ingestion/ometa/mixins/patch_mixin.py | 142 ++++++++++++----- .../integration/ometa/test_ometa_patch.py | 134 +++++++++++++++- .../ingestion/models/test_patch_request.py | 149 +++++++++++++++++- 4 files changed, 417 insertions(+), 48 deletions(-) diff --git a/ingestion/src/metadata/ingestion/models/patch_request.py b/ingestion/src/metadata/ingestion/models/patch_request.py index f654179889c..34bca7d2d90 100644 --- a/ingestion/src/metadata/ingestion/models/patch_request.py +++ b/ingestion/src/metadata/ingestion/models/patch_request.py @@ -342,6 +342,7 @@ def build_patch( array_entity_fields: Optional[List] = None, remove_change_description: bool = True, override_metadata: Optional[bool] = False, + skip_on_failure: Optional[bool] = True, ) -> Optional[jsonpatch.JsonPatch]: """ Given an Entity type and Source entity and Destination entity, @@ -352,6 +353,10 @@ def build_patch( destination: payload with changes applied to the source. allowed_fields: List of field names to filter from source and destination models restrict_update_fields: List of field names which will only support add operation + array_entity_fields: List of array fields to sort for consistent patching + remove_change_description: Whether to remove change description from entities + override_metadata: Whether to override existing metadata fields + skip_on_failure: Whether to skip the patch operation on failure (default: True) Returns Updated Entity @@ -421,10 +426,37 @@ def build_patch( patch.patch = updated_operations return patch - except Exception: + except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning("Couldn't build patch for Entity.") - return None + if skip_on_failure: + entity_info = "" + try: + if hasattr(source, "fullyQualifiedName"): + entity_info = f" for '{source.fullyQualifiedName.root}'" + elif hasattr(source, "name"): + entity_info = f" for '{source.name.root}'" + except Exception: + pass + + logger.warning( + f"Failed to build patch{entity_info}. The patch generation was skipped. " + f"Reason: {exc}" + ) + return None + else: + entity_info = "" + try: + if hasattr(source, "fullyQualifiedName"): + entity_info = f" for '{source.fullyQualifiedName.root}'" + elif hasattr(source, "name"): + entity_info = f" for '{source.name.root}'" + except Exception: + pass + + raise RuntimeError( + f"Failed to build patch{entity_info}. The patch generation failed. " + f"Set 'skip_on_failure=True' to skip failed patch operations. Error: {exc}" + ) from exc def _get_attribute_name(attr: T) -> str: @@ -534,7 +566,7 @@ def _remove_change_description(entity: T) -> T: We never want to patch that, and we won't have that information from the source. It's fully handled in the server. """ - if getattr(entity, "changeDescription"): + if hasattr(entity, "changeDescription") and getattr(entity, "changeDescription"): entity.changeDescription = None return entity diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 642e1cbe909..adcce2129b8 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -124,6 +124,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): restrict_update_fields: Optional[List] = None, array_entity_fields: Optional[List] = None, override_metadata: Optional[bool] = False, + skip_on_failure: Optional[bool] = True, ) -> Optional[T]: """ Given an Entity type and Source entity and Destination entity, @@ -135,6 +136,9 @@ class OMetaPatchMixin(OMetaPatchMixinBase): destination: payload with changes applied to the source. allowed_fields: List of field names to filter from source and destination models restrict_update_fields: List of field names which will only support add operation + array_entity_fields: List of array fields to sort for consistent patching + override_metadata: Whether to override existing metadata fields + skip_on_failure: Whether to skip the patch operation on failure (default: True) Returns Updated Entity @@ -147,6 +151,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): restrict_update_fields=restrict_update_fields, array_entity_fields=array_entity_fields, override_metadata=override_metadata, + skip_on_failure=skip_on_failure, ) if not patch: @@ -160,9 +165,19 @@ class OMetaPatchMixin(OMetaPatchMixinBase): except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Error trying to PATCH {get_log_name(source)}: {exc}") - - return None + if skip_on_failure: + entity_name = get_log_name(source) + logger.warning( + f"Failed to update {entity_name}. The patch operation was skipped. " + f"Reason: {exc}" + ) + return None + else: + entity_name = get_log_name(source) + raise RuntimeError( + f"Failed to update {entity_name}. The patch operation failed. " + f"Set 'skip_on_failure=True' to skip failed patches. Error: {exc}" + ) from exc def patch_description( self, @@ -170,6 +185,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): source: T, description: str, force: bool = False, + skip_on_failure: bool = True, ) -> Optional[T]: """ Given an Entity type and ID, JSON PATCH the description. @@ -180,33 +196,51 @@ class OMetaPatchMixin(OMetaPatchMixinBase): description: new description to add force: if True, we will patch any existing description. Otherwise, we will maintain the existing data. + skip_on_failure: if True, return None on failure instead of raising exception Returns Updated Entity """ - if isinstance(source, TestCase): - instance: Optional[T] = self._fetch_entity_if_exists( + try: + if isinstance(source, TestCase): + instance: Optional[T] = self._fetch_entity_if_exists( + entity=entity, + entity_id=source.id, + fields=["testDefinition", "testSuite"], + ) + else: + instance: Optional[T] = self._fetch_entity_if_exists( + entity=entity, entity_id=source.id + ) + + if not instance: + return None + + if instance.description and not force: + # If the description is already present and force is not passed, + # description will not be overridden + return None + + # https://docs.pydantic.dev/latest/usage/exporting_models/#modelcopy + destination = source.model_copy(deep=True) + destination.description = Markdown(description) + + return self.patch( entity=entity, - entity_id=source.id, - fields=["testDefinition", "testSuite"], + source=source, + destination=destination, + skip_on_failure=skip_on_failure, ) - else: - instance: Optional[T] = self._fetch_entity_if_exists( - entity=entity, entity_id=source.id - ) - - if not instance: - return None - - if instance.description and not force: - # If the description is already present and force is not passed, - # description will not be overridden - return None - - # https://docs.pydantic.dev/latest/usage/exporting_models/#modelcopy - destination = source.model_copy(deep=True) - destination.description = Markdown(description) - - return self.patch(entity=entity, source=source, destination=destination) + except Exception as exc: + if skip_on_failure: + logger.debug(traceback.format_exc()) + entity_name = get_log_name(source) + logger.warning( + f"Failed to patch description for {entity_name}. The patch operation was skipped. " + f"Reason: {exc}" + ) + return None + else: + raise def patch_table_constraints( self, @@ -275,6 +309,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): operation: Union[ PatchOperation.ADD, PatchOperation.REMOVE ] = PatchOperation.ADD, + skip_on_failure: bool = True, ) -> Optional[T]: """ Given an Entity type and ID, JSON PATCH the tag. @@ -284,29 +319,47 @@ class OMetaPatchMixin(OMetaPatchMixinBase): source: Source entity object tag_label: TagLabel to add or remove operation: Patch Operation to add or remove the tag. + skip_on_failure: if True, return None on failure instead of raising exception Returns Updated Entity """ - instance: Optional[T] = self._fetch_entity_if_exists( - entity=entity, entity_id=source.id, fields=["tags"] - ) - if not instance: - return None + try: + instance: Optional[T] = self._fetch_entity_if_exists( + entity=entity, entity_id=source.id, fields=["tags"] + ) + if not instance: + return None - # Initialize empty tag list or the last updated tags - source.tags = instance.tags or [] - destination = source.model_copy(deep=True) + # Initialize empty tag list or the last updated tags + source.tags = instance.tags or [] + destination = source.model_copy(deep=True) - tag_fqns = {label.tagFQN.root for label in tag_labels} + tag_fqns = {label.tagFQN.root for label in tag_labels} - if operation == PatchOperation.REMOVE: - for tag in destination.tags: - if tag.tagFQN.root in tag_fqns: - destination.tags.remove(tag) - else: - destination.tags.extend(tag_labels) + if operation == PatchOperation.REMOVE: + for tag in destination.tags: + if tag.tagFQN.root in tag_fqns: + destination.tags.remove(tag) + else: + destination.tags.extend(tag_labels) - return self.patch(entity=entity, source=source, destination=destination) + return self.patch( + entity=entity, + source=source, + destination=destination, + skip_on_failure=skip_on_failure, + ) + except Exception as exc: + if skip_on_failure: + logger.debug(traceback.format_exc()) + entity_name = get_log_name(source) + logger.warning( + f"Failed to patch tags for {entity_name}. The patch operation was skipped. " + f"Reason: {exc}" + ) + return None + else: + raise def patch_tag( self, @@ -316,11 +369,16 @@ class OMetaPatchMixin(OMetaPatchMixinBase): operation: Union[ PatchOperation.ADD, PatchOperation.REMOVE ] = PatchOperation.ADD, + skip_on_failure: bool = True, ) -> Optional[T]: """Will be deprecated in 1.3""" logger.warning("patch_tag will be deprecated in 1.3. Use `patch_tags` instead.") return self.patch_tags( - entity=entity, source=source, tag_labels=[tag_label], operation=operation + entity=entity, + source=source, + tag_labels=[tag_label], + operation=operation, + skip_on_failure=skip_on_failure, ) def patch_owner( diff --git a/ingestion/tests/integration/ometa/test_ometa_patch.py b/ingestion/tests/integration/ometa/test_ometa_patch.py index abf17b70654..4a88f045643 100644 --- a/ingestion/tests/integration/ometa/test_ometa_patch.py +++ b/ingestion/tests/integration/ometa/test_ometa_patch.py @@ -15,7 +15,7 @@ OpenMetadata high-level API Table test import logging import time from datetime import datetime -from unittest import TestCase +from unittest import TestCase, mock from _openmetadata_testutils.ometa import int_admin_ometa from metadata.generated.schema.entity.data.database import Database @@ -680,3 +680,135 @@ class OMetaTableTest(TestCase): assert patched_table.description.root == "potato" assert patched_table.owners.root assert patched_table.owners.root[0].inherited + + def test_patch_skip_on_failure_true(self): + """Test that patch operation skips failures when skip_on_failure=True.""" + # Create a destination with a change to trigger a patch + corrupted_destination = self.table.model_copy(deep=True) + corrupted_destination.description = Markdown("Modified description") + + # Mock the client.patch to raise an exception + with mock.patch.object(self.metadata.client, "patch") as mock_patch_client: + mock_patch_client.side_effect = Exception("API error") + + # Test with skip_on_failure=True (should return None) + result = self.metadata.patch( + entity=Table, + source=self.table, + destination=corrupted_destination, + skip_on_failure=True, + ) + + assert result is None + mock_patch_client.assert_called_once() + + def test_patch_skip_on_failure_false(self): + """Test that patch operation raises exception when skip_on_failure=False.""" + # Create a destination with a change to trigger a patch + corrupted_destination = self.table.model_copy(deep=True) + corrupted_destination.description = Markdown("Modified description") + + # Mock the client.patch to raise an exception + with mock.patch.object(self.metadata.client, "patch") as mock_patch_client: + mock_patch_client.side_effect = Exception("API error") + + # Test with skip_on_failure=False (should raise exception) + with self.assertRaises(RuntimeError) as context: + self.metadata.patch( + entity=Table, + source=self.table, + destination=corrupted_destination, + skip_on_failure=False, + ) + + assert "API error" in str(context.exception) + assert "Failed to update" in str(context.exception) + mock_patch_client.assert_called_once() + + def test_patch_skip_on_failure_default_behavior(self): + """Test that patch operation defaults to skip_on_failure=True.""" + # Create a destination with a change to trigger a patch + corrupted_destination = self.table.model_copy(deep=True) + corrupted_destination.description = Markdown("Modified description") + + # Mock the client.patch to raise an exception + with mock.patch.object(self.metadata.client, "patch") as mock_patch_client: + mock_patch_client.side_effect = Exception("API error") + + # Test without explicitly setting skip_on_failure (should default to True) + result = self.metadata.patch( + entity=Table, source=self.table, destination=corrupted_destination + ) + + assert result is None + mock_patch_client.assert_called_once() + + def test_patch_description_skip_on_failure_true(self): + """Test that patch_description skips failures when skip_on_failure=True.""" + # Mock _fetch_entity_if_exists to raise an exception + with mock.patch.object(self.metadata, "_fetch_entity_if_exists") as mock_fetch: + mock_fetch.side_effect = Exception("Database error") + + # Test with skip_on_failure=True + result = self.metadata.patch_description( + entity=Table, + source=self.table, + description="New description", + skip_on_failure=True, + ) + + assert result is None + mock_fetch.assert_called_once() + + def test_patch_description_skip_on_failure_false(self): + """Test that patch_description raises exception when skip_on_failure=False.""" + # Mock _fetch_entity_if_exists to raise an exception + with mock.patch.object(self.metadata, "_fetch_entity_if_exists") as mock_fetch: + mock_fetch.side_effect = Exception("Database error") + + # Test with skip_on_failure=False + with self.assertRaises(Exception) as context: + self.metadata.patch_description( + entity=Table, + source=self.table, + description="New description", + skip_on_failure=False, + ) + + assert str(context.exception) == "Database error" + mock_fetch.assert_called_once() + + def test_patch_tags_skip_on_failure_true(self): + """Test that patch_tags skips failures when skip_on_failure=True.""" + # Mock _fetch_entity_if_exists to raise an exception + with mock.patch.object(self.metadata, "_fetch_entity_if_exists") as mock_fetch: + mock_fetch.side_effect = Exception("Database error") + + # Test with skip_on_failure=True + result = self.metadata.patch_tags( + entity=Table, + source=self.table, + tag_labels=[PII_TAG_LABEL], + skip_on_failure=True, + ) + + assert result is None + mock_fetch.assert_called_once() + + def test_patch_tags_skip_on_failure_false(self): + """Test that patch_tags raises exception when skip_on_failure=False.""" + # Mock _fetch_entity_if_exists to raise an exception + with mock.patch.object(self.metadata, "_fetch_entity_if_exists") as mock_fetch: + mock_fetch.side_effect = Exception("Database error") + + # Test with skip_on_failure=False + with self.assertRaises(Exception) as context: + self.metadata.patch_tags( + entity=Table, + source=self.table, + tag_labels=[PII_TAG_LABEL], + skip_on_failure=False, + ) + + assert str(context.exception) == "Database error" + mock_fetch.assert_called_once() diff --git a/ingestion/tests/unit/metadata/ingestion/models/test_patch_request.py b/ingestion/tests/unit/metadata/ingestion/models/test_patch_request.py index 414477bfef8..4cbb9e2ffa3 100644 --- a/ingestion/tests/unit/metadata/ingestion/models/test_patch_request.py +++ b/ingestion/tests/unit/metadata/ingestion/models/test_patch_request.py @@ -13,10 +13,12 @@ Check the JSONPatch operations work as expected """ from unittest import TestCase +from unittest.mock import Mock, patch import jsonpatch +from pydantic import BaseModel -from metadata.ingestion.models.patch_request import JsonPatchUpdater +from metadata.ingestion.models.patch_request import JsonPatchUpdater, build_patch class JsonPatchUpdaterTest(TestCase): @@ -104,3 +106,148 @@ class JsonPatchUpdaterTest(TestCase): updated_operations = json_patch_updater.update(json_patch) self.assertEqual(expected, updated_operations) + + +class BuildPatchTest(TestCase): + """Validate build_patch function operations with skip_on_failure parameter.""" + + def setUp(self): + """Set up test fixtures.""" + + class TestModel(BaseModel): + name: str + value: int + description: str = None + + self.TestModel = TestModel + + self.source = TestModel(name="test", value=1, description="source") + self.destination = TestModel(name="test", value=2, description="destination") + + def test_build_patch_skip_on_failure_true_with_exception(self): + """Test that build_patch returns None when skip_on_failure=True and exception occurs.""" + + # Mock jsonpatch.make_patch to raise an exception + with patch( + "metadata.ingestion.models.patch_request.jsonpatch.make_patch" + ) as mock_make_patch: + mock_make_patch.side_effect = Exception("Test exception") + + # Test with skip_on_failure=True (default) + result = build_patch( + source=self.source, destination=self.destination, skip_on_failure=True + ) + + self.assertIsNone(result) + mock_make_patch.assert_called_once() + + def test_build_patch_skip_on_failure_false_with_exception(self): + """Test that build_patch raises exception when skip_on_failure=False and exception occurs.""" + + # Mock jsonpatch.make_patch to raise an exception + with patch( + "metadata.ingestion.models.patch_request.jsonpatch.make_patch" + ) as mock_make_patch: + mock_make_patch.side_effect = Exception("Test exception") + + # Test with skip_on_failure=False + with self.assertRaises(RuntimeError) as context: + build_patch( + source=self.source, + destination=self.destination, + skip_on_failure=False, + ) + + self.assertIn("Test exception", str(context.exception)) + self.assertIn("Failed to build patch", str(context.exception)) + mock_make_patch.assert_called_once() + + def test_build_patch_skip_on_failure_default_behavior(self): + """Test that build_patch defaults to skip_on_failure=True.""" + + # Mock jsonpatch.make_patch to raise an exception + with patch( + "metadata.ingestion.models.patch_request.jsonpatch.make_patch" + ) as mock_make_patch: + mock_make_patch.side_effect = Exception("Test exception") + + # Test without explicitly setting skip_on_failure (should default to True) + result = build_patch(source=self.source, destination=self.destination) + + self.assertIsNone(result) + mock_make_patch.assert_called_once() + + def test_build_patch_success_with_skip_on_failure_false(self): + """Test that build_patch works normally when skip_on_failure=False and no exception occurs.""" + + # Create a real patch to test successful operation + result = build_patch( + source=self.source, destination=self.destination, skip_on_failure=False + ) + + self.assertIsNotNone(result) + self.assertIsInstance(result, jsonpatch.JsonPatch) + + # Verify the patch contains the expected operations + patch_operations = result.patch + self.assertEqual(len(patch_operations), 2) + + # Find the value operation + value_op = next((op for op in patch_operations if op["path"] == "/value"), None) + self.assertIsNotNone(value_op) + self.assertEqual(value_op["op"], "replace") + self.assertEqual(value_op["value"], 2) + + def test_build_patch_success_with_skip_on_failure_true(self): + """Test that build_patch works normally when skip_on_failure=True and no exception occurs.""" + + # Create a real patch to test successful operation + result = build_patch( + source=self.source, destination=self.destination, skip_on_failure=True + ) + + self.assertIsNotNone(result) + self.assertIsInstance(result, jsonpatch.JsonPatch) + + # Verify the patch contains the expected operations + patch_operations = result.patch + self.assertEqual(len(patch_operations), 2) + + # Find the value operation + value_op = next((op for op in patch_operations if op["path"] == "/value"), None) + self.assertIsNotNone(value_op) + self.assertEqual(value_op["op"], "replace") + self.assertEqual(value_op["value"], 2) + + def test_build_patch_with_json_patch_updater_exception(self): + """Test skip_on_failure behavior when JsonPatchUpdater.update raises an exception.""" + + # Mock JsonPatchUpdater.update to raise an exception + with patch( + "metadata.ingestion.models.patch_request.JsonPatchUpdater.from_restrict_update_fields" + ) as mock_updater_factory: + mock_updater = Mock() + mock_updater.update.side_effect = Exception("JsonPatchUpdater exception") + mock_updater_factory.return_value = mock_updater + + # Test with skip_on_failure=True + result = build_patch( + source=self.source, + destination=self.destination, + restrict_update_fields=["description"], + skip_on_failure=True, + ) + + self.assertIsNone(result) + + # Test with skip_on_failure=False + with self.assertRaises(RuntimeError) as context: + build_patch( + source=self.source, + destination=self.destination, + restrict_update_fields=["description"], + skip_on_failure=False, + ) + + self.assertIn("JsonPatchUpdater exception", str(context.exception)) + self.assertIn("Failed to build patch", str(context.exception))