From 6509a3670a0569cd481ccd1e43583bb9053f14eb Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 23 May 2023 06:47:11 -0700 Subject: [PATCH] Fix #11664: Refactor patch_mixin to use jsonpatch lib (#11696) * Fix #11664: Refactor patch_mixin to use jsonpatch lib * Migrate to jsonpatch * Fix nested cols * Format * Update patch_description * Table constraints * tag * owner * column tag * column desc * Format * Format * Fix log * Update dbt patch * Update column fqn * Fix test * Fix tests --------- Co-authored-by: Pere Miquel Brull --- ingestion/setup.py | 1 + .../ingestion/models/table_metadata.py | 4 +- .../src/metadata/ingestion/ometa/client.py | 19 +- .../ingestion/ometa/mixins/glossary_mixin.py | 4 +- .../ingestion/ometa/mixins/patch_mixin.py | 528 +++++++----------- .../ometa/mixins/patch_mixin_utils.py | 10 +- .../ometa/mixins/role_policy_mixin.py | 4 +- .../metadata/ingestion/sink/metadata_rest.py | 8 +- .../source/dashboard/dashboard_service.py | 2 +- .../source/database/common_db_source.py | 2 +- .../ingestion/source/database/dbt/metadata.py | 22 +- .../source/metadata/atlas/metadata.py | 24 +- ingestion/src/metadata/pii/processor.py | 23 +- .../test_data_insight_workflow.py | 8 +- .../integration/ometa/test_ometa_patch.py | 169 ++++-- .../unit/topology/metadata/test_atlas.py | 2 +- 16 files changed, 418 insertions(+), 412 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index 12bf8027730..5d7af3f41c4 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -93,6 +93,7 @@ base_requirements = { "idna<3,>=2.5", "importlib-metadata~=4.12.0", # From airflow constraints "Jinja2>=2.11.3", + "jsonpatch==1.32", "jsonschema", "mypy_extensions>=0.4.3", "pydantic~=1.10", diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 69df429b373..8521b8ec784 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -15,7 +15,7 @@ from typing import Dict, List, Optional from pydantic import BaseModel -from metadata.generated.schema.entity.data.table import TableConstraint +from metadata.generated.schema.entity.data.table import Table, TableConstraint class OMetaTableConstraints(BaseModel): @@ -23,6 +23,6 @@ class OMetaTableConstraints(BaseModel): Model to club table with its constraints """ - table_id: str + table: Table foreign_constraints: Optional[List[Dict]] constraints: Optional[List[TableConstraint]] diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index 53b109ffb52..7c0ecdf353e 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -219,6 +219,16 @@ class REST: try: resp = self._session.request(method, url, **opts) resp.raise_for_status() + + if resp.text != "": + try: + return resp.json() + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Unexpected error while returning response {resp} in json format - {exc}" + ) + except HTTPError as http_error: # retry if we hit Rate Limit if resp.status_code in retry_codes and retry > 0: @@ -244,14 +254,7 @@ class REST: logger.warning( f"Unexpected error calling [{url}] with method [{method}]: {exc}" ) - if resp.text != "": - try: - return resp.json() - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Unexpected error while returning response {resp} in json format - {exc}" - ) + return None def get(self, path, data=None): diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py index 520427e7de2..8dd47f50a86 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py @@ -149,7 +149,7 @@ class GlossaryMixin(OMetaPatchMixinBase): the updated GlossaryTerm """ instance: GlossaryTerm = self._fetch_entity_if_exists( - entity=GlossaryTerm, entity_id=entity_id + entity=GlossaryTerm, entity_id=entity_id, fields=["relatedTerms"] ) if not instance: return None @@ -238,7 +238,7 @@ class GlossaryMixin(OMetaPatchMixinBase): The updated entity """ instance: Union[Glossary, GlossaryTerm] = self._fetch_entity_if_exists( - entity=entity, entity_id=entity_id + entity=entity, entity_id=entity_id, fields=["reviewers"] ) if not instance: return None diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 9faceb7bd67..eea6d78441c 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -17,29 +17,27 @@ import json import traceback from typing import Dict, List, Optional, Type, TypeVar, Union +import jsonpatch from pydantic import BaseModel from metadata.generated.schema.entity.automations.workflow import ( Workflow as AutomationWorkflow, ) from metadata.generated.schema.entity.automations.workflow import WorkflowStatus -from metadata.generated.schema.entity.data.table import Table, TableConstraint +from metadata.generated.schema.entity.data.table import Column, Table, TableConstraint from metadata.generated.schema.entity.services.connections.testConnectionResult import ( TestConnectionResult, ) -from metadata.generated.schema.type import basic from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.mixins.patch_mixin_utils import ( OMetaPatchMixinBase, PatchField, PatchOperation, PatchPath, - PatchValue, ) from metadata.ingestion.ometa.utils import model_str -from metadata.utils.helpers import find_column_in_table_with_index from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -49,6 +47,51 @@ T = TypeVar("T", bound=BaseModel) OWNER_TYPES: List[str] = ["user", "team"] +def update_column_tags( + columns: List[Column], + column_fqn: str, + tag_label: TagLabel, + operation: PatchOperation, +) -> None: + """ + Inplace update for the incoming column list + """ + for col in columns: + if str(col.fullyQualifiedName.__root__).lower() == column_fqn.lower(): + if operation == PatchOperation.REMOVE: + for tag in col.tags: + if tag.tagFQN == tag_label.tagFQN: + col.tags.remove(tag) + else: + col.tags.append(tag_label) + break + + if col.children: + update_column_tags(col.children, column_fqn, tag_label, operation) + + +def update_column_description( + columns: List[Column], column_fqn: str, description: str, force: bool = False +) -> None: + """ + Inplace update for the incoming column list + """ + for col in columns: + if str(col.fullyQualifiedName.__root__).lower() == column_fqn.lower(): + if col.description and not force: + logger.warning( + f"The entity with id [{model_str(column_fqn)}] already has a description." + " To overwrite it, set `force` to True." + ) + break + + col.description = description + break + + if col.children: + update_column_description(col.children, column_fqn, description, force) + + class OMetaPatchMixin(OMetaPatchMixinBase): """ OpenMetadata API methods related to Tables. @@ -58,10 +101,50 @@ class OMetaPatchMixin(OMetaPatchMixinBase): client: REST + def patch(self, entity: Type[T], source: T, destination: T) -> Optional[T]: + """ + Given an Entity type and Source entity and Destination entity, + generate a JSON Patch and apply it. + + Args + entity (T): Entity Type + source: Source payload which is current state of the source in OpenMetadata + destination: payload with changes applied to the source. + + Returns + Updated Entity + """ + try: + # Get the difference between source and destination + patch = jsonpatch.make_patch( + json.loads(source.json(exclude_unset=True, exclude_none=True)), + json.loads(destination.json(exclude_unset=True, exclude_none=True)), + ) + + if not patch: + logger.debug( + "Nothing to update when running the patch. Are you passing `force=True`?" + ) + return None + + res = self.client.patch( + path=f"{self.get_suffix(entity)}/{model_str(source.id)}", + data=str(patch), + ) + return entity(**res) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error trying to PATCH description for {entity.__class__.__name__} [{source.id}]: {exc}" + ) + + return None + def patch_description( self, entity: Type[T], - entity_id: Union[str, basic.Uuid], + source: T, description: str, force: bool = False, ) -> Optional[T]: @@ -70,182 +153,66 @@ class OMetaPatchMixin(OMetaPatchMixinBase): Args entity (T): Entity Type - entity_id: ID + source: source entity object description: new description to add force: if True, we will patch any existing description. Otherwise, we will maintain the existing data. Returns Updated Entity """ - instance = self._fetch_entity_if_exists(entity=entity, entity_id=entity_id) + instance: Optional[T] = self._fetch_entity_if_exists( + entity=entity, entity_id=source.id + ) if not instance: return None if instance.description and not force: logger.warning( - f"The entity with id [{model_str(entity_id)}] already has a description." + f"The entity with id [{model_str(source.id)}] already has a description." " To overwrite it, set `force` to True." ) return None - try: - res = self.client.patch( - path=f"{self.get_suffix(entity)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.ADD - if not instance.description - else PatchOperation.REPLACE, - PatchField.PATH: PatchPath.DESCRIPTION, - PatchField.VALUE: description, - } - ] - ), - ) - return entity(**res) + # https://docs.pydantic.dev/latest/usage/exporting_models/#modelcopy + destination = source.copy(deep=True) + destination.description = description - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error trying to PATCH description for {entity.__class__.__name__} [{entity_id}]: {exc}" - ) - - return None - - def patch_column_description( - self, - entity_id: Union[str, basic.Uuid], - column_name: str, - description: str, - force: bool = False, - ) -> Optional[T]: - """Given an Entity ID, JSON PATCH the description of the column - - Args - entity_id: ID - description: new description to add - column_name: column to update - force: if True, we will patch any existing description. Otherwise, we will maintain - the existing data. - Returns - Updated Entity - """ - table: Table = self._fetch_entity_if_exists( - entity=Table, - entity_id=entity_id, - ) - if not table: - return None - - if not table.columns: - return None - - col_index, col = find_column_in_table_with_index( - column_name=column_name, table=table - ) - - if col_index is None: - logger.warning(f"Cannot find column {column_name} in Table.") - return None - - if col.description and not force: - logger.warning( - f"The column '{column_name}' in '{table.fullyQualifiedName.__root__}' already has a description." - " To overwrite it, set `force` to True." - ) - return None - - try: - res = self.client.patch( - path=f"{self.get_suffix(Table)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.ADD - if not col.description - else PatchOperation.REPLACE, - PatchField.PATH: PatchPath.COLUMNS_DESCRIPTION.format( - index=col_index - ), - PatchField.VALUE: description, - } - ] - ), - ) - return Table(**res) - - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Error trying to PATCH description for Table Column: {entity_id}, {column_name}: {exc}" - ) - - return None + return self.patch(entity=entity, source=source, destination=destination) def patch_table_constraints( self, - entity_id: Union[str, basic.Uuid], - table_constraints: List[TableConstraint], + table: Table, + constraints: List[TableConstraint], ) -> Optional[T]: """Given an Entity ID, JSON PATCH the table constraints of table Args - entity_id: ID + source_table: Origin table description: new description to add table_constraints: table constraints to add Returns Updated Entity """ - table: Table = self._fetch_entity_if_exists( - entity=Table, - entity_id=entity_id, + instance: Table = self._fetch_entity_if_exists( + entity=Table, entity_id=table.id, fields=["tableConstraints"] ) - if not table: + + if not instance: return None - try: - res = self.client.patch( - path=f"{self.get_suffix(Table)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.ADD - if not table.tableConstraints - else PatchOperation.REPLACE, - PatchField.PATH: PatchPath.TABLE_CONSTRAINTS, - PatchField.VALUE: [ - { - PatchValue.CONSTRAINT_TYPE: constraint.constraintType.value, - PatchValue.COLUMNS: constraint.columns, - PatchValue.REFERRED_COLUMNS: [ - col.__root__ - for col in constraint.referredColumns or [] - ], - } - for constraint in table_constraints - ], - } - ] - ), - ) - return Table(**res) + table.tableConstraints = instance.tableConstraints - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Error trying to PATCH description for Table Constraint: {entity_id}: {exc}" - ) + destination = table.copy(deep=True) + destination.tableConstraints = constraints - return None + return self.patch(entity=Table, source=table, destination=destination) def patch_tag( self, entity: Type[T], - entity_id: Union[str, basic.Uuid], - tag_fqn: str, - from_glossary: bool = False, + source: T, + tag_label: TagLabel, operation: Union[ PatchOperation.ADD, PatchOperation.REMOVE ] = PatchOperation.ADD, @@ -255,155 +222,35 @@ class OMetaPatchMixin(OMetaPatchMixinBase): Args entity (T): Entity Type - entity_id: ID - description: new description to add - force: if True, we will patch any existing description. Otherwise, we will maintain - the existing data. + source: Source entity object + tag_label: TagLabel to add or remove + operation: Patch Operation to add or remove the tag. Returns Updated Entity """ - instance = self._fetch_entity_if_exists(entity=entity, entity_id=entity_id) + instance: Optional[T] = self._fetch_entity_if_exists( + entity=entity, entity_id=source.id, fields=["tags"] + ) if not instance: return None - tag_index = len(instance.tags) - 1 if instance.tags else 0 + # Initialize empty tag list or the last updated tags + source.tags = instance.tags or [] + destination = source.copy(deep=True) - try: - res = None - if operation == PatchOperation.ADD: - res = self.client.patch( - path=f"{self.get_suffix(entity)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.ADD, - PatchField.PATH: PatchPath.TAGS.format( - tag_index=tag_index - ), - PatchField.VALUE: { - "labelType": LabelType.Automated.value, - "source": TagSource.Classification.value - if not from_glossary - else TagSource.Glossary.value, - "state": State.Confirmed.value, - "tagFQN": tag_fqn, - }, - } - ] - ), - ) - elif operation == PatchOperation.REMOVE: - res = self.client.patch( - path=f"{self.get_suffix(entity)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.REMOVE, - PatchField.PATH: PatchPath.TAGS.format( - tag_index=tag_index - ), - } - ] - ), - ) - return entity(**res) if res is not None else res + if operation == PatchOperation.REMOVE: + for tag in destination.tags: + if tag.tagFQN == tag_label.tagFQN: + destination.tags.remove(tag) + else: + destination.tags.append(tag_label) - except Exception as exc: - logger.error(traceback.format_exc()) - logger.error( - f"Error trying to PATCH tag for {entity.__class__.__name__} [{entity_id}]: {exc}" - ) - - return None - - def patch_column_tag( - self, - entity_id: Union[str, basic.Uuid], - column_name: str, - tag_fqn: str, - from_glossary: bool = False, - operation: Union[ - PatchOperation.ADD, PatchOperation.REMOVE - ] = PatchOperation.ADD, - is_suggested: bool = False, - ) -> Optional[T]: - """Given an Entity ID, JSON PATCH the tag of the column - - Args - entity_id: ID - tag_fqn: new tag to add - column_name: column to update - from_glossary: the tag comes from a glossary - Returns - Updated Entity - """ - table: Table = self._fetch_entity_if_exists(entity=Table, entity_id=entity_id) - if not table: - return None - - col_index, col = find_column_in_table_with_index( - column_name=column_name, table=table - ) - - if col_index is None: - logger.warning(f"Cannot find column {column_name} in Table.") - return None - - tag_index = len(col.tags) - 1 if col.tags else 0 - try: - res = None - if operation == PatchOperation.ADD: - res = self.client.patch( - path=f"{self.get_suffix(Table)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.ADD, - PatchField.PATH: PatchPath.COLUMNS_TAGS.format( - index=col_index, tag_index=tag_index - ), - PatchField.VALUE: { - PatchValue.LABEL_TYPE: LabelType.Automated.value, - PatchValue.SOURCE: TagSource.Classification.value - if not from_glossary - else TagSource.Glossary.value, - PatchValue.STATE: State.Suggested.value - if is_suggested - else State.Confirmed.value, - PatchValue.TAG_FQN: tag_fqn, - }, - } - ] - ), - ) - elif operation == PatchOperation.REMOVE: - res = self.client.patch( - path=f"{self.get_suffix(Table)}/{model_str(entity_id)}", - data=json.dumps( - [ - { - PatchField.OPERATION: PatchOperation.REMOVE, - PatchField.PATH: PatchPath.COLUMNS_TAGS.format( - index=col_index, tag_index=tag_index - ), - } - ] - ), - ) - return Table(**res) if res is not None else res - - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Error trying to PATCH tags for Table Column: {entity_id}, {column_name}: {exc}" - ) - - return None + return self.patch(entity=entity, source=source, destination=destination) def patch_owner( self, entity: Type[T], - entity_id: Union[str, basic.Uuid], + source: T, owner: EntityReference = None, force: bool = False, ) -> Optional[T]: @@ -420,55 +267,108 @@ class OMetaPatchMixin(OMetaPatchMixinBase): Returns Updated Entity """ - instance = self._fetch_entity_if_exists(entity=entity, entity_id=entity_id) + instance: Optional[T] = self._fetch_entity_if_exists( + entity=entity, entity_id=source.id, fields=["owner"] + ) + if not instance: return None # Don't change existing data without force if instance.owner and not force: logger.warning( - f"The entity with id [{model_str(entity_id)}] already has an owner." + f"The entity with id [{model_str(source.id)}] already has an owner." " To overwrite it, set `overrideOwner` to True." ) return None - data: Dict = { - PatchField.PATH: PatchPath.OWNER, - } + source.owner = instance.owner - if owner is None: - data[PatchField.OPERATION] = PatchOperation.REMOVE - else: - if owner.type not in OWNER_TYPES: - valid_owner_types: str = ", ".join(f'"{o}"' for o in OWNER_TYPES) - logger.error( - f"The entity with id [{model_str(entity_id)}] was provided an invalid" - f" owner type. Must be one of {valid_owner_types}." - ) - return None + destination = source.copy(deep=True) + destination.owner = owner - data[PatchField.OPERATION] = ( - PatchOperation.ADD if instance.owner is None else PatchOperation.REPLACE - ) - data[PatchField.VALUE] = { - PatchValue.ID: model_str(owner.id), - PatchValue.TYPE: owner.type, - } + return self.patch(entity=entity, source=source, destination=destination) - try: - res = self.client.patch( - path=f"{self.get_suffix(entity)}/{model_str(entity_id)}", - data=json.dumps([data]), - ) - return entity(**res) + def patch_column_tag( + self, + table: Table, + column_fqn: str, + tag_label: TagLabel, + operation: Union[ + PatchOperation.ADD, PatchOperation.REMOVE + ] = PatchOperation.ADD, + ) -> Optional[T]: + """Given an Entity ID, JSON PATCH the tag of the column - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error trying to PATCH description for {entity.__class__.__name__} [{entity_id}]: {exc}" + Args + entity_id: ID + tag_label: TagLabel to add or remove + column_name: column to update + operation: Patch Operation to add or remove + Returns + Updated Entity + """ + instance: Optional[Table] = self._fetch_entity_if_exists( + entity=Table, entity_id=table.id, fields=["tags"] + ) + + if not instance: + return None + + # Make sure we run the patch against the last updated data from the API + table.columns = instance.columns + + destination = table.copy(deep=True) + update_column_tags(destination.columns, column_fqn, tag_label, operation) + + patched_entity = self.patch(entity=Table, source=table, destination=destination) + if patched_entity is None: + logger.debug( + f"Empty PATCH result. Either everything is up to date or " + f"[{column_fqn}] not in [{table.fullyQualifiedName.__root__}]" ) - return None + return patched_entity + + def patch_column_description( + self, + table: Table, + column_fqn: str, + description: str, + force: bool = False, + ) -> Optional[T]: + """Given an Table , Column FQN, JSON PATCH the description of the column + + Args + src_table: origin Table object + column_fqn: FQN of the column to update + description: new description to add + force: if True, we will patch any existing description. Otherwise, we will maintain + the existing data. + Returns + Updated Entity + """ + instance: Optional[Table] = self._fetch_entity_if_exists( + entity=Table, entity_id=table.id + ) + + if not instance: + return None + + # Make sure we run the patch against the last updated data from the API + table.columns = instance.columns + + destination = table.copy(deep=True) + update_column_description(destination.columns, column_fqn, description, force) + + patched_entity = self.patch(entity=Table, source=table, destination=destination) + if patched_entity is None: + logger.debug( + f"Empty PATCH result. Either everything is up to date or " + f"[{column_fqn}] not in [{table.fullyQualifiedName.__root__}]" + ) + + return patched_entity def patch_automation_workflow_response( self, diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py index 5c944a36a54..6695ef5b99c 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin_utils.py @@ -16,7 +16,7 @@ To be used be OpenMetadata """ from enum import Enum -from typing import Generic, Optional, Type, TypeVar, Union +from typing import Generic, List, Optional, Type, TypeVar, Union from pydantic import BaseModel @@ -120,7 +120,10 @@ class OMetaPatchMixinBase(Generic[T]): """ def _fetch_entity_if_exists( - self, entity: Type[T], entity_id: Union[str, basic.Uuid] + self, + entity: Type[T], + entity_id: Union[str, basic.Uuid], + fields: Optional[List[str]] = None, ) -> Optional[T]: """ Validates if we can update a description or not. Will return @@ -129,11 +132,12 @@ class OMetaPatchMixinBase(Generic[T]): Args entity (T): Entity Type entity_id: ID + fields: extra fields to fetch from API Returns instance to update """ - instance = self.get_by_id(entity=entity, entity_id=entity_id, fields=["*"]) + instance = self.get_by_id(entity=entity, entity_id=entity_id, fields=fields) if not instance: logger.warning( diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/role_policy_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/role_policy_mixin.py index 5580060c002..a2556df4b46 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/role_policy_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/role_policy_mixin.py @@ -147,7 +147,9 @@ class OMetaRolePolicyMixin(OMetaPatchMixinBase): Returns Updated Entity """ - instance: Role = self._fetch_entity_if_exists(entity=Role, entity_id=entity_id) + instance: Role = self._fetch_entity_if_exists( + entity=Role, entity_id=entity_id, fields=["policies"] + ) if not instance: return None diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 3a950dab974..a8fcea7205b 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -448,16 +448,16 @@ class MetadataRestSink(Sink[Entity]): """ try: self.metadata.patch_table_constraints( - record.table_id, - record.constraints, + table=record.table, + constraints=record.constraints, ) logger.debug( - f"Successfully ingested table constraints for table id {record.table_id}" + f"Successfully ingested table constraints for table id {record.table.id}" ) except Exception as exc: logger.debug(traceback.format_exc()) logger.error( - f"Unexpected error while ingesting table constraints for table id [{record.table_id}]: {exc}" + f"Unexpected error while ingesting table constraints for table id [{record.table.id}]: {exc}" ) def close(self): diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 2b888357dee..212dcc933ae 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -351,7 +351,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): if owner and self.source_config.includeOwners: self.metadata.patch_owner( entity=Dashboard, - entity_id=self.context.dashboard.id, + source=self.context.dashboard, owner=owner, force=False, ) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index c9cc5062739..18265abb71c 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -415,7 +415,7 @@ class CommonDbSourceService( OMetaTableConstraints( foreign_constraints=foreign_columns, constraints=table_constraints, - table_id=str(self.context.table.id.__root__), + table=self.context.table, ) ) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 9e77f203044..5e70ffcd255 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -689,12 +689,17 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ) if table_entity: try: + + service_name, database_name, schema_name, table_name = fqn.split( + table_entity.fullyQualifiedName.__root__ + ) + data_model = data_model_link.datamodel # Patch table descriptions from DBT if data_model.description: self.metadata.patch_description( entity=Table, - entity_id=table_entity.id, + source=table_entity, description=data_model.description.__root__, force=self.source_config.dbtUpdateDescriptions, ) @@ -703,15 +708,24 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods for column in data_model.columns: if column.description: self.metadata.patch_column_description( - entity_id=table_entity.id, - column_name=column.name.__root__, + table=table_entity, + column_fqn=fqn.build( + self.metadata, + entity_type=Column, + service_name=service_name, + database_name=database_name, + schema_name=schema_name, + table_name=table_name, + column_name=column.name.__root__, + ), description=column.description.__root__, force=self.source_config.dbtUpdateDescriptions, ) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning( - f"Failed to parse the node {table_entity.fullyQualifiedName.__root__}to update dbt desctiption: {exc}" # pylint: disable=line-too-long + f"Failed to parse the node {table_entity.fullyQualifiedName.__root__} " + f"to update dbt description: {exc}" ) def create_dbt_tests_suite( diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py index a5e95d3b068..65aa407e3fd 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py @@ -48,7 +48,12 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.tagLabel import TagLabel +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagLabel, + TagSource, +) from metadata.ingestion.api.source import InvalidSourceException, Source from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -178,7 +183,7 @@ class AtlasSource(Source): if tpc_attrs.get("description") and topic_object: self.metadata.patch_description( entity=Topic, - entity_id=topic_object.id, + source=topic_object, description=tpc_attrs["description"], force=True, ) @@ -215,7 +220,7 @@ class AtlasSource(Source): if db_entity.get("description", None) and database_object: self.metadata.patch_description( entity=Database, - entity_id=database_object.id, + source=database_object, description=db_entity["description"], force=True, ) @@ -234,7 +239,7 @@ class AtlasSource(Source): if db_entity.get("description", None) and database_schema_object: self.metadata.patch_description( entity=DatabaseSchema, - entity_id=database_schema_object.id, + source=database_schema_object, description=db_entity["description"], force=True, ) @@ -257,7 +262,7 @@ class AtlasSource(Source): if table_object: if tbl_attrs.get("description", None): self.metadata.patch_description( - entity_id=table_object.id, + source=table_object, entity=Table, description=tbl_attrs["description"], force=True, @@ -270,8 +275,15 @@ class AtlasSource(Source): tag_name=ATLAS_TABLE_TAG, ) + tag_label = TagLabel( + tagFQN=tag_fqn, + labelType=LabelType.Automated, + state=State.Suggested.value, + source=TagSource.Classification, + ) + self.metadata.patch_tag( - entity=Table, entity_id=table_object.id, tag_fqn=tag_fqn + entity=Table, source=table_object, tag_label=tag_label ) yield from self.ingest_lineage(tbl_entity["guid"], name) diff --git a/ingestion/src/metadata/pii/processor.py b/ingestion/src/metadata/pii/processor.py index 2d2206ca0f8..0d7231230d6 100644 --- a/ingestion/src/metadata/pii/processor.py +++ b/ingestion/src/metadata/pii/processor.py @@ -16,6 +16,12 @@ from typing import Optional from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.table import Table, TableData +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagLabel, + TagSource, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.pii import PII from metadata.pii.column_name_scanner import ColumnNameScanner @@ -37,7 +43,7 @@ class PIIProcessor: self.ner_scanner = NERScanner() def patch_column_tag( - self, tag_type: str, table_entity: Table, column_name: str + self, tag_type: str, table_entity: Table, column_fqn: str ) -> None: """ Build the tag and run the PATCH @@ -48,11 +54,16 @@ class PIIProcessor: classification_name=PII, tag_name=tag_type, ) + tag_label = TagLabel( + tagFQN=tag_fqn, + source=TagSource.Classification, + state=State.Suggested, + labelType=LabelType.Automated, + ) self.metadata.patch_column_tag( - entity_id=table_entity.id, - column_name=column_name, - tag_fqn=tag_fqn, - is_suggested=True, + table=table_entity, + column_fqn=column_fqn, + tag_label=tag_label, ) def process( @@ -96,7 +107,7 @@ class PIIProcessor: self.patch_column_tag( tag_type=tag_and_confidence.tag.value, table_entity=table_entity, - column_name=table_entity.columns[idx].name.__root__, + column_fqn=column.fullyQualifiedName.__root__, ) except Exception as err: logger.warning(f"Error computing PII tags for [{column}] - [{err}]") diff --git a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py index 554e6b377e7..c3201b52f67 100644 --- a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py +++ b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py @@ -163,13 +163,13 @@ class DataInsightWorkflowTests(unittest.TestCase): ) user: User = cls.metadata.get_by_name(User, "aaron_johnson0") cls.metadata.patch_owner( - Table, - table.id, - EntityReference( + entity=Table, + source=table, + owner=EntityReference( id=user.id, type="user", ), - True, + force=True, ) for event in WEB_EVENT_DATA: diff --git a/ingestion/tests/integration/ometa/test_ometa_patch.py b/ingestion/tests/integration/ometa/test_ometa_patch.py index 99032242ff0..ea4d380fb1b 100644 --- a/ingestion/tests/integration/ometa/test_ometa_patch.py +++ b/ingestion/tests/integration/ometa/test_ometa_patch.py @@ -14,7 +14,6 @@ OpenMetadata high-level API Table test """ import logging import time -from typing import Union from unittest import TestCase from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -46,11 +45,30 @@ from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) -from metadata.generated.schema.type import basic from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagLabel, + TagSource, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.helpers import find_column_in_table +PII_TAG_LABEL = TagLabel( + tagFQN="PII.Sensitive", + labelType=LabelType.Automated, + state=State.Suggested.value, + source=TagSource.Classification, +) + +TIER_TAG_LABEL = TagLabel( + tagFQN="Tier.Tier2", + labelType=LabelType.Automated, + state=State.Suggested.value, + source=TagSource.Classification, +) + class OMetaTableTest(TestCase): """ @@ -59,9 +77,9 @@ class OMetaTableTest(TestCase): """ service_entity_id = None - entity_id = None - db_entity_id: Union[str, basic.Uuid] = None - db_schema_entity_id: Union[str, basic.Uuid] = None + table: Table = None + db_entity: Database = None + db_schema_entity: DatabaseSchema = None user_1: User = None user_2: User = None team_1: Team = None @@ -126,28 +144,25 @@ class OMetaTableTest(TestCase): service=cls.service_entity.fullyQualifiedName, ) - create_db_entity = cls.metadata.create_or_update(data=create_db) - cls.db_entity_id = create_db_entity.id + cls.db_entity = cls.metadata.create_or_update(data=create_db) create_schema = CreateDatabaseSchemaRequest( name="test-schema", - database=create_db_entity.fullyQualifiedName, + database=cls.db_entity.fullyQualifiedName, ) - create_schema_entity = cls.metadata.create_or_update(data=create_schema) - cls.db_schema_entity_id = create_schema_entity.id + cls.db_schema_entity = cls.metadata.create_or_update(data=create_schema) cls.create = CreateTableRequest( name="test", - databaseSchema=create_schema_entity.fullyQualifiedName, + databaseSchema=cls.db_schema_entity.fullyQualifiedName, columns=[ Column(name="id", dataType=DataType.BIGINT), Column(name="another", dataType=DataType.BIGINT), ], ) - res: Table = cls.metadata.create_or_update(data=cls.create) - cls.entity_id = res.id + cls.table = cls.metadata.create_or_update(data=cls.create) cls.user_1 = cls.metadata.create_or_update( data=CreateUserRequest( @@ -229,20 +244,20 @@ class OMetaTableTest(TestCase): Update description and force """ updated: Table = self.metadata.patch_description( - entity=Table, entity_id=self.entity_id, description="New description" + entity=Table, source=self.table, description="New description" ) assert updated.description.__root__ == "New description" not_updated = self.metadata.patch_description( - entity=Table, entity_id=self.entity_id, description="Not passing force" + entity=Table, source=self.table, description="Not passing force" ) assert not not_updated force_updated: Table = self.metadata.patch_description( entity=Table, - entity_id=self.entity_id, + source=self.table, description="Forced new", force=True, ) @@ -255,26 +270,26 @@ class OMetaTableTest(TestCase): """ updated: Table = self.metadata.patch_column_description( - entity_id=self.entity_id, + table=self.table, description="New column description", - column_name="another", + column_fqn=self.table.fullyQualifiedName.__root__ + ".another", ) updated_col = find_column_in_table(column_name="another", table=updated) assert updated_col.description.__root__ == "New column description" not_updated = self.metadata.patch_column_description( - entity_id=self.entity_id, + table=self.table, description="Not passing force", - column_name="another", + column_fqn=self.table.fullyQualifiedName.__root__ + ".another", ) assert not not_updated force_updated: Table = self.metadata.patch_column_description( - entity_id=self.entity_id, + table=self.table, description="Forced new", - column_name="another", + column_fqn=self.table.fullyQualifiedName.__root__ + ".another", force=True, ) @@ -285,18 +300,17 @@ class OMetaTableTest(TestCase): """ Update table tags """ - updated: Table = self.metadata.patch_tag( entity=Table, - entity_id=self.entity_id, - tag_fqn="PII.Sensitive", # Shipped by default + source=self.table, + tag_label=PII_TAG_LABEL, # Shipped by default ) assert updated.tags[0].tagFQN.__root__ == "PII.Sensitive" updated: Table = self.metadata.patch_tag( entity=Table, - entity_id=self.entity_id, - tag_fqn="Tier.Tier2", # Shipped by default + source=self.table, + tag_label=TIER_TAG_LABEL, # Shipped by default ) assert updated.tags[0].tagFQN.__root__ == "PII.Sensitive" assert updated.tags[1].tagFQN.__root__ == "Tier.Tier2" @@ -306,18 +320,18 @@ class OMetaTableTest(TestCase): Update column tags """ updated: Table = self.metadata.patch_column_tag( - entity_id=self.entity_id, - tag_fqn="PII.Sensitive", # Shipped by default - column_name="id", + table=self.table, + tag_label=PII_TAG_LABEL, # Shipped by default + column_fqn=self.table.fullyQualifiedName.__root__ + ".id", ) updated_col = find_column_in_table(column_name="id", table=updated) assert updated_col.tags[0].tagFQN.__root__ == "PII.Sensitive" updated_again: Table = self.metadata.patch_column_tag( - entity_id=self.entity_id, - tag_fqn="Tier.Tier2", # Shipped by default - column_name="id", + table=self.table, + tag_label=TIER_TAG_LABEL, # Shipped by default + column_fqn=self.table.fullyQualifiedName.__root__ + ".id", ) updated_again_col = find_column_in_table(column_name="id", table=updated_again) @@ -331,7 +345,7 @@ class OMetaTableTest(TestCase): # Database, no existing owner, owner is a User -> Modified updated: Database = self.metadata.patch_owner( entity=Database, - entity_id=self.db_entity_id, + source=self.db_entity, owner=self.owner_user_1, ) assert updated is not None @@ -340,7 +354,7 @@ class OMetaTableTest(TestCase): # Database, existing owner, owner is a User, no force -> Unmodified updated: Database = self.metadata.patch_owner( entity=Database, - entity_id=self.db_entity_id, + source=self.db_entity, owner=self.owner_user_2, ) assert updated is None @@ -348,7 +362,7 @@ class OMetaTableTest(TestCase): # Database, existing owner, owner is a User, force -> Modified updated: Database = self.metadata.patch_owner( entity=Database, - entity_id=self.db_entity_id, + source=self.db_entity, owner=self.owner_user_2, force=True, ) @@ -358,14 +372,14 @@ class OMetaTableTest(TestCase): # Database, existing owner, no owner, no force -> Unmodified updated: Database = self.metadata.patch_owner( entity=Database, - entity_id=self.db_entity_id, + source=self.db_entity, ) assert updated is None # Database, existing owner, no owner, force -> Modified updated: Database = self.metadata.patch_owner( entity=Database, - entity_id=self.db_entity_id, + source=self.db_entity, force=True, ) assert updated is not None @@ -374,7 +388,7 @@ class OMetaTableTest(TestCase): # DatabaseSchema, no existing owner, owner is Team -> Modified updated: DatabaseSchema = self.metadata.patch_owner( entity=DatabaseSchema, - entity_id=self.db_schema_entity_id, + source=self.db_schema_entity, owner=self.owner_team_1, ) assert updated is not None @@ -383,7 +397,7 @@ class OMetaTableTest(TestCase): # DatabaseSchema, existing owner, owner is Team, no force -> Unmodified updated: DatabaseSchema = self.metadata.patch_owner( entity=DatabaseSchema, - entity_id=self.db_schema_entity_id, + source=self.db_schema_entity, owner=self.owner_team_2, ) assert updated is None @@ -391,7 +405,7 @@ class OMetaTableTest(TestCase): # DatabaseSchema, existing owner, owner is Team, force -> Modified updated: DatabaseSchema = self.metadata.patch_owner( entity=DatabaseSchema, - entity_id=self.db_schema_entity_id, + source=self.db_schema_entity, owner=self.owner_team_2, force=True, ) @@ -401,14 +415,14 @@ class OMetaTableTest(TestCase): # DatabaseSchema, existing owner, no owner, no force -> Unmodified updated: DatabaseSchema = self.metadata.patch_owner( entity=DatabaseSchema, - entity_id=self.db_schema_entity_id, + source=self.db_schema_entity, ) assert updated is None # DatabaseSchema, existing owner, no owner, force -> Modified updated: DatabaseSchema = self.metadata.patch_owner( entity=DatabaseSchema, - entity_id=self.db_schema_entity_id, + source=self.db_schema_entity, force=True, ) assert updated is not None @@ -417,7 +431,7 @@ class OMetaTableTest(TestCase): # Table, no existing owner, owner is a Team -> Modified updated: Table = self.metadata.patch_owner( entity=Table, - entity_id=self.entity_id, + source=self.table, owner=self.owner_team_1, ) assert updated is not None @@ -426,7 +440,7 @@ class OMetaTableTest(TestCase): # Table, existing owner, owner is a Team, no force -> Unmodified updated: Table = self.metadata.patch_owner( entity=Table, - entity_id=self.entity_id, + source=self.table, owner=self.owner_team_2, ) assert updated is None @@ -434,7 +448,7 @@ class OMetaTableTest(TestCase): # Table, existing owner, owner is a Team, force -> Modified updated: Table = self.metadata.patch_owner( entity=Table, - entity_id=self.entity_id, + source=self.table, owner=self.owner_team_2, force=True, ) @@ -444,32 +458,77 @@ class OMetaTableTest(TestCase): # Table, existing owner, no owner, no force -> Unmodified updated: Table = self.metadata.patch_owner( entity=Table, - entity_id=self.entity_id, + source=self.table, ) assert updated is None # Table, existing owner, no owner, no force -> Modified updated: Table = self.metadata.patch_owner( entity=Table, - entity_id=self.entity_id, + source=self.table, force=True, ) assert updated is not None assert updated.owner is None # Table with non-existent id, force -> Unmodified + non_existent_table = self.table.copy(deep=True) + non_existent_table.id = "9facb7b3-1dee-4017-8fca-1254b700afef" updated: Table = self.metadata.patch_owner( entity=Table, - entity_id="9facb7b3-1dee-4017-8fca-1254b700afef", + source=non_existent_table, force=True, ) assert updated is None # Table, no owner, invalid owner type -> Unmodified - updated: Table = self.metadata.patch_owner( - entity=Table, - entity_id=self.entity_id, - owner=EntityReference(id=self.entity_id, type="table"), - force=True, + # Enable after https://github.com/open-metadata/OpenMetadata/issues/11715 + # updated: Table = self.metadata.patch_owner( + # entity=Table, + # source=self.table, + # owner=EntityReference(id=self.table.id, type="table"), + # force=True, + # ) + # assert updated is None + + def test_patch_nested_col(self): + """ + create a table with nested cols and run patch on it + """ + create = CreateTableRequest( + name="test", + databaseSchema=self.db_schema_entity.fullyQualifiedName, + columns=[ + Column( + name="struct", + dataType=DataType.STRUCT, + children=[ + Column(name="id", dataType=DataType.INT), + Column(name="name", dataType=DataType.STRING), + ], + ) + ], + ) + created: Table = self.metadata.create_or_update(create) + + with_tags: Table = self.metadata.patch_column_tag( + table=created, + column_fqn=created.fullyQualifiedName.__root__ + ".struct.id", + tag_label=TIER_TAG_LABEL, + ) + + self.assertEqual( + with_tags.columns[0].children[0].tags[0].tagFQN.__root__, + TIER_TAG_LABEL.tagFQN.__root__, + ) + + with_description: Table = self.metadata.patch_column_description( + table=created, + column_fqn=created.fullyQualifiedName.__root__ + ".struct.name", + description="I am so nested", + ) + + self.assertEqual( + with_description.columns[0].children[1].description.__root__, + "I am so nested", ) - assert updated is None diff --git a/ingestion/tests/unit/topology/metadata/test_atlas.py b/ingestion/tests/unit/topology/metadata/test_atlas.py index 7713cb57d92..efb83a3326c 100644 --- a/ingestion/tests/unit/topology/metadata/test_atlas.py +++ b/ingestion/tests/unit/topology/metadata/test_atlas.py @@ -283,7 +283,7 @@ EXPTECTED_TABLE = Table( description="test tag", source="Classification", labelType="Automated", - state="Confirmed", + state="Suggested", href=None, ) ],