diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index d4853e9c17..4e9798e812 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -169,6 +169,17 @@ class PermissiveConfigModel(ConfigModel): extra = Extra.allow +class ConnectionModel(BaseModel): + """Represents the config associated with a connection""" + + class Config: + if PYDANTIC_VERSION_2: # noqa: SIM108 + extra = "allow" + else: + extra = Extra.allow + underscore_attrs_are_private = True + + class TransformerSemantics(ConfigEnum): """Describes semantics for aspect changes""" diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 13b76630cb..8905cab707 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -207,7 +207,7 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI): Note: Only supported with DataHub Cloud. """ - if not self.server_config: + if not hasattr(self, "server_config") or not self.server_config: self.test_connection() base_url = self.server_config.raw_config.get("baseUrl") @@ -838,11 +838,11 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI): def _bulk_fetch_schema_info_by_filter( self, *, - platform: Optional[str] = None, + platform: Union[None, str, List[str]] = None, platform_instance: Optional[str] = None, env: Optional[str] = None, query: Optional[str] = None, - container: Optional[str] = None, + container: Union[None, str, List[str]] = None, status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, batch_size: int = 100, extraFilters: Optional[List[RawSearchFilterRule]] = None, @@ -914,11 +914,11 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI): self, *, entity_types: Optional[Sequence[str]] = None, - platform: Optional[str] = None, + platform: Union[None, str, List[str]] = None, platform_instance: Optional[str] = None, env: Optional[str] = None, query: Optional[str] = None, - container: Optional[str] = None, + container: Union[None, str, List[str]] = None, status: Optional[RemovedStatusFilter] = RemovedStatusFilter.NOT_SOFT_DELETED, batch_size: int = 5000, extraFilters: Optional[List[RawSearchFilterRule]] = None, @@ -1018,11 +1018,11 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI): self, *, entity_types: Optional[List[str]] = None, - platform: Optional[str] = None, + platform: Union[None, str, List[str]] = None, platform_instance: Optional[str] = None, env: Optional[str] = None, query: Optional[str] = None, - container: Optional[str] = None, + container: Union[None, str, List[str]] = None, status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, batch_size: int = 5000, extra_and_filters: Optional[List[RawSearchFilterRule]] = None, diff --git a/metadata-ingestion/src/datahub/ingestion/graph/filters.py b/metadata-ingestion/src/datahub/ingestion/graph/filters.py index 9bf520218e..ccd2c09c9e 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/filters.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/filters.py @@ -76,11 +76,21 @@ class RemovedStatusFilter(enum.Enum): """Search only soft-deleted entities.""" +def _validate_or_filter_structure( + or_filters: List[Dict[str, List[SearchFilterRule]]], +) -> None: + for filter_list in or_filters: + if "and" not in filter_list: + raise ValueError(f"Invalid or filter: {filter_list}") + if not isinstance(filter_list["and"], list): + raise ValueError(f"Invalid or filter: {filter_list}") + + def generate_filter( - platform: Optional[str], + platform: Union[None, str, List[str]], platform_instance: Optional[str], env: Optional[str], - container: Optional[str], + container: Union[None, str, List[str]], status: Optional[RemovedStatusFilter], extra_filters: Optional[List[RawSearchFilterRule]], extra_or_filters: Optional[RawSearchFilter] = None, @@ -93,8 +103,7 @@ def generate_filter( :param container: The container to filter by. :param status: The status to filter by. :param extra_filters: Extra AND filters to apply. - :param extra_or_filters: Extra OR filters to apply. These are combined with - the AND filters using an OR at the top level. + :param extra_or_filters: Extra OR filters to apply. These are combined with the AND filters using an OR at the top level. """ and_filters: List[RawSearchFilterRule] = [] @@ -218,23 +227,31 @@ def _get_status_filter(status: RemovedStatusFilter) -> Optional[SearchFilterRule raise ValueError(f"Invalid status filter: {status}") -def _get_container_filter(container: str) -> SearchFilterRule: +def _get_container_filter(container: Union[str, List[str]]) -> SearchFilterRule: + if not isinstance(container, list): + container = [container] + # Warn if container is not a fully qualified urn. # TODO: Change this once we have a first-class container urn type. - if guess_entity_type(container) != "container": - raise ValueError(f"Invalid container urn: {container}") + for cont in container: + if guess_entity_type(cont) != "container": + raise ValueError(f"Invalid container urn: {cont}") return SearchFilterRule( field="browsePathV2", - values=[container], + values=container, condition="CONTAIN", ) def _get_platform_instance_filter( - platform: Optional[str], platform_instance: str + platform: Union[None, str, List[str]], platform_instance: str ) -> SearchFilterRule: if platform: + if isinstance(platform, list): + raise ValueError( + "Platform instance filter cannot be combined with a multi-value platform filter." + ) # Massage the platform instance into a fully qualified urn, if necessary. platform_instance = make_dataplatform_instance_urn(platform, platform_instance) @@ -250,9 +267,11 @@ def _get_platform_instance_filter( ) -def _get_platform_filter(platform: str) -> SearchFilterRule: +def _get_platform_filter(platform: Union[str, List[str]]) -> SearchFilterRule: + if not isinstance(platform, list): + platform = [platform] return SearchFilterRule( field="platform.keyword", condition="EQUAL", - values=[make_data_platform_urn(platform)], + values=[make_data_platform_urn(plt) for plt in platform], ) diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py index 48050bbad8..dc0f27558f 100644 --- a/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py +++ b/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py @@ -70,3 +70,30 @@ class HasStructuredPropertiesPatch(MetadataPatchProposal): ), ) return self + + def set_structured_property_manual( + self, property: StructuredPropertyValueAssignmentClass + ) -> Self: + """Add or update a structured property, using a StructuredPropertyValueAssignmentClass object.""" + + self.remove_structured_property(property.propertyUrn) + self._add_patch( + StructuredPropertiesClass.ASPECT_NAME, + "add", + path=("properties", property.propertyUrn), + value=property, + ) + return self + + def add_structured_property_manual( + self, property: StructuredPropertyValueAssignmentClass + ) -> Self: + """Add a structured property, using a StructuredPropertyValueAssignmentClass object.""" + + self._add_patch( + StructuredPropertiesClass.ASPECT_NAME, + "add", + path=("properties", property.propertyUrn), + value=property, + ) + return self