feat: ConnectionModel and DataHubGraph:get_urns_by_filter and StructuredProperties from saas (#14912)

This commit is contained in:
Sergio Gómez Villamor 2025-10-02 17:23:06 +02:00 committed by GitHub
parent 5da54bf14d
commit 28b866a721
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 75 additions and 18 deletions

View File

@ -169,6 +169,17 @@ class PermissiveConfigModel(ConfigModel):
extra = Extra.allow extra = Extra.allow
class ConnectionModel(BaseModel):
"""Represents the config associated with a connection"""
class Config:
if PYDANTIC_VERSION_2: # noqa: SIM108
extra = "allow"
else:
extra = Extra.allow
underscore_attrs_are_private = True
class TransformerSemantics(ConfigEnum): class TransformerSemantics(ConfigEnum):
"""Describes semantics for aspect changes""" """Describes semantics for aspect changes"""

View File

@ -207,7 +207,7 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
Note: Only supported with DataHub Cloud. Note: Only supported with DataHub Cloud.
""" """
if not self.server_config: if not hasattr(self, "server_config") or not self.server_config:
self.test_connection() self.test_connection()
base_url = self.server_config.raw_config.get("baseUrl") base_url = self.server_config.raw_config.get("baseUrl")
@ -838,11 +838,11 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
def _bulk_fetch_schema_info_by_filter( def _bulk_fetch_schema_info_by_filter(
self, self,
*, *,
platform: Optional[str] = None, platform: Union[None, str, List[str]] = None,
platform_instance: Optional[str] = None, platform_instance: Optional[str] = None,
env: Optional[str] = None, env: Optional[str] = None,
query: Optional[str] = None, query: Optional[str] = None,
container: Optional[str] = None, container: Union[None, str, List[str]] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 100, batch_size: int = 100,
extraFilters: Optional[List[RawSearchFilterRule]] = None, extraFilters: Optional[List[RawSearchFilterRule]] = None,
@ -914,11 +914,11 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
self, self,
*, *,
entity_types: Optional[Sequence[str]] = None, entity_types: Optional[Sequence[str]] = None,
platform: Optional[str] = None, platform: Union[None, str, List[str]] = None,
platform_instance: Optional[str] = None, platform_instance: Optional[str] = None,
env: Optional[str] = None, env: Optional[str] = None,
query: Optional[str] = None, query: Optional[str] = None,
container: Optional[str] = None, container: Union[None, str, List[str]] = None,
status: Optional[RemovedStatusFilter] = RemovedStatusFilter.NOT_SOFT_DELETED, status: Optional[RemovedStatusFilter] = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 5000, batch_size: int = 5000,
extraFilters: Optional[List[RawSearchFilterRule]] = None, extraFilters: Optional[List[RawSearchFilterRule]] = None,
@ -1018,11 +1018,11 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
self, self,
*, *,
entity_types: Optional[List[str]] = None, entity_types: Optional[List[str]] = None,
platform: Optional[str] = None, platform: Union[None, str, List[str]] = None,
platform_instance: Optional[str] = None, platform_instance: Optional[str] = None,
env: Optional[str] = None, env: Optional[str] = None,
query: Optional[str] = None, query: Optional[str] = None,
container: Optional[str] = None, container: Union[None, str, List[str]] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 5000, batch_size: int = 5000,
extra_and_filters: Optional[List[RawSearchFilterRule]] = None, extra_and_filters: Optional[List[RawSearchFilterRule]] = None,

View File

@ -76,11 +76,21 @@ class RemovedStatusFilter(enum.Enum):
"""Search only soft-deleted entities.""" """Search only soft-deleted entities."""
def _validate_or_filter_structure(
or_filters: List[Dict[str, List[SearchFilterRule]]],
) -> None:
for filter_list in or_filters:
if "and" not in filter_list:
raise ValueError(f"Invalid or filter: {filter_list}")
if not isinstance(filter_list["and"], list):
raise ValueError(f"Invalid or filter: {filter_list}")
def generate_filter( def generate_filter(
platform: Optional[str], platform: Union[None, str, List[str]],
platform_instance: Optional[str], platform_instance: Optional[str],
env: Optional[str], env: Optional[str],
container: Optional[str], container: Union[None, str, List[str]],
status: Optional[RemovedStatusFilter], status: Optional[RemovedStatusFilter],
extra_filters: Optional[List[RawSearchFilterRule]], extra_filters: Optional[List[RawSearchFilterRule]],
extra_or_filters: Optional[RawSearchFilter] = None, extra_or_filters: Optional[RawSearchFilter] = None,
@ -93,8 +103,7 @@ def generate_filter(
:param container: The container to filter by. :param container: The container to filter by.
:param status: The status to filter by. :param status: The status to filter by.
:param extra_filters: Extra AND filters to apply. :param extra_filters: Extra AND filters to apply.
:param extra_or_filters: Extra OR filters to apply. These are combined with :param extra_or_filters: Extra OR filters to apply. These are combined with the AND filters using an OR at the top level.
the AND filters using an OR at the top level.
""" """
and_filters: List[RawSearchFilterRule] = [] and_filters: List[RawSearchFilterRule] = []
@ -218,23 +227,31 @@ def _get_status_filter(status: RemovedStatusFilter) -> Optional[SearchFilterRule
raise ValueError(f"Invalid status filter: {status}") raise ValueError(f"Invalid status filter: {status}")
def _get_container_filter(container: str) -> SearchFilterRule: def _get_container_filter(container: Union[str, List[str]]) -> SearchFilterRule:
if not isinstance(container, list):
container = [container]
# Warn if container is not a fully qualified urn. # Warn if container is not a fully qualified urn.
# TODO: Change this once we have a first-class container urn type. # TODO: Change this once we have a first-class container urn type.
if guess_entity_type(container) != "container": for cont in container:
raise ValueError(f"Invalid container urn: {container}") if guess_entity_type(cont) != "container":
raise ValueError(f"Invalid container urn: {cont}")
return SearchFilterRule( return SearchFilterRule(
field="browsePathV2", field="browsePathV2",
values=[container], values=container,
condition="CONTAIN", condition="CONTAIN",
) )
def _get_platform_instance_filter( def _get_platform_instance_filter(
platform: Optional[str], platform_instance: str platform: Union[None, str, List[str]], platform_instance: str
) -> SearchFilterRule: ) -> SearchFilterRule:
if platform: if platform:
if isinstance(platform, list):
raise ValueError(
"Platform instance filter cannot be combined with a multi-value platform filter."
)
# Massage the platform instance into a fully qualified urn, if necessary. # Massage the platform instance into a fully qualified urn, if necessary.
platform_instance = make_dataplatform_instance_urn(platform, platform_instance) platform_instance = make_dataplatform_instance_urn(platform, platform_instance)
@ -250,9 +267,11 @@ def _get_platform_instance_filter(
) )
def _get_platform_filter(platform: str) -> SearchFilterRule: def _get_platform_filter(platform: Union[str, List[str]]) -> SearchFilterRule:
if not isinstance(platform, list):
platform = [platform]
return SearchFilterRule( return SearchFilterRule(
field="platform.keyword", field="platform.keyword",
condition="EQUAL", condition="EQUAL",
values=[make_data_platform_urn(platform)], values=[make_data_platform_urn(plt) for plt in platform],
) )

View File

@ -70,3 +70,30 @@ class HasStructuredPropertiesPatch(MetadataPatchProposal):
), ),
) )
return self return self
def set_structured_property_manual(
self, property: StructuredPropertyValueAssignmentClass
) -> Self:
"""Add or update a structured property, using a StructuredPropertyValueAssignmentClass object."""
self.remove_structured_property(property.propertyUrn)
self._add_patch(
StructuredPropertiesClass.ASPECT_NAME,
"add",
path=("properties", property.propertyUrn),
value=property,
)
return self
def add_structured_property_manual(
self, property: StructuredPropertyValueAssignmentClass
) -> Self:
"""Add a structured property, using a StructuredPropertyValueAssignmentClass object."""
self._add_patch(
StructuredPropertiesClass.ASPECT_NAME,
"add",
path=("properties", property.propertyUrn),
value=property,
)
return self