diff --git a/metadata-ingestion/src/datahub/sdk/search_filters.py b/metadata-ingestion/src/datahub/sdk/search_filters.py index 35694717d5..1a8eb2bdc9 100644 --- a/metadata-ingestion/src/datahub/sdk/search_filters.py +++ b/metadata-ingestion/src/datahub/sdk/search_filters.py @@ -1,6 +1,7 @@ from __future__ import annotations import abc +import json from typing import ( TYPE_CHECKING, Annotated, @@ -406,26 +407,45 @@ if TYPE_CHECKING or not PYDANTIC_SUPPORTS_CALLABLE_DISCRIMINATOR: else: from pydantic import Discriminator, Tag + def _parse_json_from_string(value: Any) -> Any: + if isinstance(value, str): + try: + return json.loads(value) + except json.JSONDecodeError: + return value + else: + return value + # TODO: Once we're fully on pydantic 2, we can use a RootModel here. # That way we'd be able to attach methods to the Filter type. # e.g. replace load_filters(...) with Filter.load(...) Filter = Annotated[ - Union[ - Annotated[_And, Tag(_And._field_discriminator())], - Annotated[_Or, Tag(_Or._field_discriminator())], - Annotated[_Not, Tag(_Not._field_discriminator())], - Annotated[_EntityTypeFilter, Tag(_EntityTypeFilter._field_discriminator())], - Annotated[ - _EntitySubtypeFilter, Tag(_EntitySubtypeFilter._field_discriminator()) + Annotated[ + Union[ + Annotated[_And, Tag(_And._field_discriminator())], + Annotated[_Or, Tag(_Or._field_discriminator())], + Annotated[_Not, Tag(_Not._field_discriminator())], + Annotated[ + _EntityTypeFilter, Tag(_EntityTypeFilter._field_discriminator()) + ], + Annotated[ + _EntitySubtypeFilter, + Tag(_EntitySubtypeFilter._field_discriminator()), + ], + Annotated[_StatusFilter, Tag(_StatusFilter._field_discriminator())], + Annotated[_PlatformFilter, Tag(_PlatformFilter._field_discriminator())], + Annotated[_DomainFilter, Tag(_DomainFilter._field_discriminator())], + Annotated[ + _ContainerFilter, Tag(_ContainerFilter._field_discriminator()) + ], + Annotated[_EnvFilter, Tag(_EnvFilter._field_discriminator())], + Annotated[ + _CustomCondition, Tag(_CustomCondition._field_discriminator()) + ], ], - Annotated[_StatusFilter, Tag(_StatusFilter._field_discriminator())], - Annotated[_PlatformFilter, Tag(_PlatformFilter._field_discriminator())], - Annotated[_DomainFilter, Tag(_DomainFilter._field_discriminator())], - Annotated[_ContainerFilter, Tag(_ContainerFilter._field_discriminator())], - Annotated[_EnvFilter, Tag(_EnvFilter._field_discriminator())], - Annotated[_CustomCondition, Tag(_CustomCondition._field_discriminator())], + Discriminator(_filter_discriminator), ], - Discriminator(_filter_discriminator), + pydantic.BeforeValidator(_parse_json_from_string), ] # Required to resolve forward references to "Filter" diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_search_client.py b/metadata-ingestion/tests/unit/sdk_v2/test_search_client.py index f7f27c66b4..808fe84dce 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/test_search_client.py +++ b/metadata-ingestion/tests/unit/sdk_v2/test_search_client.py @@ -334,6 +334,22 @@ def test_tagged_union_error_messages() -> None: ): load_filters({"and": [{"unknown_field": 6}]}) + # Test that we can load a filter from a string. + # Sometimes we get filters encoded as JSON, and we want to handle those gracefully. + filter_str = '{\n "and": [\n {"entity_type": ["dataset"]},\n {"entity_subtype": ["Table"]},\n {"platform": ["snowflake"]}\n ]\n}' + assert load_filters(filter_str) == F.and_( + F.entity_type("dataset"), + F.entity_subtype("Table"), + F.platform("snowflake"), + ) + with pytest.raises( + ValidationError, + match=re.compile( + r"1 validation error.+Unable to extract tag using discriminator", re.DOTALL + ), + ): + load_filters("this is invalid json but should not raise a json error") + def test_invalid_filter() -> None: with pytest.raises(InvalidUrnError):