feat(sdk/search): add owner filter (#14649)

Co-authored-by: Mayuri N <mayuri.nehate@datahub.com>
This commit is contained in:
Mayuri Nehate 2025-09-29 10:05:29 +05:30 committed by GitHub
parent 0b75674ab3
commit 8d13b03e85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 85 additions and 1 deletions

View File

@ -0,0 +1,6 @@
from datahub.sdk import DataHubClient
from datahub.sdk.search_filters import FilterDsl as F
# search for all assets having user urn:li:corpuser:jdoe as owner
client = DataHubClient(server="<your_server>", token="<your_token>")
results = client.search.get_urns(filter=F.owner("urn:li:corpuser:jdoe"))

View File

@ -30,7 +30,14 @@ from datahub.ingestion.graph.filters import (
_get_status_filter,
)
from datahub.metadata.schema_classes import EntityTypeName
from datahub.metadata.urns import ContainerUrn, DataPlatformUrn, DomainUrn
from datahub.metadata.urns import (
ContainerUrn,
CorpGroupUrn,
CorpUserUrn,
DataPlatformUrn,
DomainUrn,
)
from datahub.utilities.urns.urn import guess_entity_type
_AndSearchFilterRule = TypedDict(
"_AndSearchFilterRule", {"and": List[SearchFilterRule]}
@ -235,6 +242,36 @@ class _EnvFilter(_BaseFilter):
]
class _OwnerFilter(_BaseFilter):
"""Filter for entities owned by specific users or groups."""
owner: List[str] = pydantic.Field(
description="The owner to filter on. Should be user or group URNs.",
)
@pydantic.validator("owner", each_item=True)
def validate_owner(cls, v: str) -> str:
if not v.startswith("urn:li:"):
raise ValueError(f"Owner must be a valid User or Group URN, got: {v}")
_type = guess_entity_type(v)
if _type == CorpUserUrn.ENTITY_TYPE:
return str(CorpUserUrn.from_string(v))
elif _type == CorpGroupUrn.ENTITY_TYPE:
return str(CorpGroupUrn.from_string(v))
else:
raise ValueError(f"Owner must be a valid User or Group URN, got: {v}")
def _build_rule(self) -> SearchFilterRule:
return SearchFilterRule(
field="owners",
condition="EQUAL",
values=self.owner,
)
def compile(self) -> _OrFilters:
return [{"and": [self._build_rule()]}]
class _CustomCondition(_BaseFilter):
"""Represents a single field condition."""
@ -407,6 +444,7 @@ if TYPE_CHECKING or not PYDANTIC_SUPPORTS_CALLABLE_DISCRIMINATOR:
_DomainFilter,
_ContainerFilter,
_EnvFilter,
_OwnerFilter,
_CustomCondition,
]
@ -448,6 +486,7 @@ else:
_ContainerFilter, Tag(_ContainerFilter._field_discriminator())
],
Annotated[_EnvFilter, Tag(_EnvFilter._field_discriminator())],
Annotated[_OwnerFilter, Tag(_OwnerFilter._field_discriminator())],
Annotated[
_CustomCondition, Tag(_CustomCondition._field_discriminator())
],
@ -551,6 +590,10 @@ class FilterDsl:
def env(env: Union[str, Sequence[str]], /) -> _EnvFilter:
return _EnvFilter(env=[env] if isinstance(env, str) else env)
@staticmethod
def owner(owner: Union[str, Sequence[str]], /) -> _OwnerFilter:
return _OwnerFilter(owner=[owner] if isinstance(owner, str) else owner)
@staticmethod
def has_custom_property(key: str, value: str) -> _CustomCondition:
return _CustomCondition(

View File

@ -382,6 +382,41 @@ def test_filter_before_validators() -> None:
load_filters(filter_str)
def test_owner_filter() -> None:
"""Test basic owner filter functionality."""
filter_obj: Filter = load_filters({"owner": ["urn:li:corpuser:john"]})
assert filter_obj == F.owner("urn:li:corpuser:john")
assert filter_obj.compile() == [
{
"and": [
SearchFilterRule(
field="owners",
condition="EQUAL",
values=["urn:li:corpuser:john"],
)
]
}
]
def test_owner_filter_mixed_types() -> None:
"""Test owner filter with both user and group URNs."""
filter_obj: Filter = load_filters(
{"owner": ["urn:li:corpuser:john", "urn:li:corpGroup:engineering"]}
)
assert filter_obj == F.owner(
["urn:li:corpuser:john", "urn:li:corpGroup:engineering"]
)
def test_invalid_owner_filter() -> None:
"""Test validation error for invalid owner URN."""
with pytest.raises(
ValidationError, match="Owner must be a valid User or Group URN"
):
F.owner("invalid-owner")
def test_invalid_filter() -> None:
with pytest.raises(InvalidUrnError):
F.domain("marketing")