Fix #11664: Refactor patch_mixin to use jsonpatch lib (#11696)

* Fix #11664: Refactor patch_mixin to use jsonpatch lib

* Migrate to jsonpatch

* Fix nested cols

* Format

* Update patch_description

* Table constraints

* tag

* owner

* column tag

* column desc

* Format

* Format

* Fix log

* Update dbt patch

* Update column fqn

* Fix test

* Fix tests

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Sriharsha Chintalapani 2023-05-23 06:47:11 -07:00 committed by GitHub
parent 7589b7eeef
commit 6509a3670a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 418 additions and 412 deletions

View File

@ -93,6 +93,7 @@ base_requirements = {
"idna<3,>=2.5",
"importlib-metadata~=4.12.0", # From airflow constraints
"Jinja2>=2.11.3",
"jsonpatch==1.32",
"jsonschema",
"mypy_extensions>=0.4.3",
"pydantic~=1.10",

View File

@ -15,7 +15,7 @@ from typing import Dict, List, Optional
from pydantic import BaseModel
from metadata.generated.schema.entity.data.table import TableConstraint
from metadata.generated.schema.entity.data.table import Table, TableConstraint
class OMetaTableConstraints(BaseModel):
@ -23,6 +23,6 @@ class OMetaTableConstraints(BaseModel):
Model to club table with its constraints
"""
table_id: str
table: Table
foreign_constraints: Optional[List[Dict]]
constraints: Optional[List[TableConstraint]]

View File

@ -219,6 +219,16 @@ class REST:
try:
resp = self._session.request(method, url, **opts)
resp.raise_for_status()
if resp.text != "":
try:
return resp.json()
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected error while returning response {resp} in json format - {exc}"
)
except HTTPError as http_error:
# retry if we hit Rate Limit
if resp.status_code in retry_codes and retry > 0:
@ -244,14 +254,7 @@ class REST:
logger.warning(
f"Unexpected error calling [{url}] with method [{method}]: {exc}"
)
if resp.text != "":
try:
return resp.json()
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected error while returning response {resp} in json format - {exc}"
)
return None
def get(self, path, data=None):

View File

@ -149,7 +149,7 @@ class GlossaryMixin(OMetaPatchMixinBase):
the updated GlossaryTerm
"""
instance: GlossaryTerm = self._fetch_entity_if_exists(
entity=GlossaryTerm, entity_id=entity_id
entity=GlossaryTerm, entity_id=entity_id, fields=["relatedTerms"]
)
if not instance:
return None
@ -238,7 +238,7 @@ class GlossaryMixin(OMetaPatchMixinBase):
The updated entity
"""
instance: Union[Glossary, GlossaryTerm] = self._fetch_entity_if_exists(
entity=entity, entity_id=entity_id
entity=entity, entity_id=entity_id, fields=["reviewers"]
)
if not instance:
return None

View File

@ -17,29 +17,27 @@ import json
import traceback
from typing import Dict, List, Optional, Type, TypeVar, Union
import jsonpatch
from pydantic import BaseModel
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.automations.workflow import WorkflowStatus
from metadata.generated.schema.entity.data.table import Table, TableConstraint
from metadata.generated.schema.entity.data.table import Column, Table, TableConstraint
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.mixins.patch_mixin_utils import (
OMetaPatchMixinBase,
PatchField,
PatchOperation,
PatchPath,
PatchValue,
)
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.helpers import find_column_in_table_with_index
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -49,6 +47,51 @@ T = TypeVar("T", bound=BaseModel)
OWNER_TYPES: List[str] = ["user", "team"]
def update_column_tags(
columns: List[Column],
column_fqn: str,
tag_label: TagLabel,
operation: PatchOperation,
) -> None:
"""
Inplace update for the incoming column list
"""
for col in columns:
if str(col.fullyQualifiedName.__root__).lower() == column_fqn.lower():
if operation == PatchOperation.REMOVE:
for tag in col.tags:
if tag.tagFQN == tag_label.tagFQN:
col.tags.remove(tag)
else:
col.tags.append(tag_label)
break
if col.children:
update_column_tags(col.children, column_fqn, tag_label, operation)
def update_column_description(
columns: List[Column], column_fqn: str, description: str, force: bool = False
) -> None:
"""
Inplace update for the incoming column list
"""
for col in columns:
if str(col.fullyQualifiedName.__root__).lower() == column_fqn.lower():
if col.description and not force:
logger.warning(
f"The entity with id [{model_str(column_fqn)}] already has a description."
" To overwrite it, set `force` to True."
)
break
col.description = description
break
if col.children:
update_column_description(col.children, column_fqn, description, force)
class OMetaPatchMixin(OMetaPatchMixinBase):
"""
OpenMetadata API methods related to Tables.
@ -58,10 +101,50 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
client: REST
def patch(self, entity: Type[T], source: T, destination: T) -> Optional[T]:
"""
Given an Entity type and Source entity and Destination entity,
generate a JSON Patch and apply it.
Args
entity (T): Entity Type
source: Source payload which is current state of the source in OpenMetadata
destination: payload with changes applied to the source.
Returns
Updated Entity
"""
try:
# Get the difference between source and destination
patch = jsonpatch.make_patch(
json.loads(source.json(exclude_unset=True, exclude_none=True)),
json.loads(destination.json(exclude_unset=True, exclude_none=True)),
)
if not patch:
logger.debug(
"Nothing to update when running the patch. Are you passing `force=True`?"
)
return None
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(source.id)}",
data=str(patch),
)
return entity(**res)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to PATCH description for {entity.__class__.__name__} [{source.id}]: {exc}"
)
return None
def patch_description(
self,
entity: Type[T],
entity_id: Union[str, basic.Uuid],
source: T,
description: str,
force: bool = False,
) -> Optional[T]:
@ -70,182 +153,66 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
Args
entity (T): Entity Type
entity_id: ID
source: source entity object
description: new description to add
force: if True, we will patch any existing description. Otherwise, we will maintain
the existing data.
Returns
Updated Entity
"""
instance = self._fetch_entity_if_exists(entity=entity, entity_id=entity_id)
instance: Optional[T] = self._fetch_entity_if_exists(
entity=entity, entity_id=source.id
)
if not instance:
return None
if instance.description and not force:
logger.warning(
f"The entity with id [{model_str(entity_id)}] already has a description."
f"The entity with id [{model_str(source.id)}] already has a description."
" To overwrite it, set `force` to True."
)
return None
try:
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.ADD
if not instance.description
else PatchOperation.REPLACE,
PatchField.PATH: PatchPath.DESCRIPTION,
PatchField.VALUE: description,
}
]
),
)
return entity(**res)
# https://docs.pydantic.dev/latest/usage/exporting_models/#modelcopy
destination = source.copy(deep=True)
destination.description = description
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to PATCH description for {entity.__class__.__name__} [{entity_id}]: {exc}"
)
return None
def patch_column_description(
self,
entity_id: Union[str, basic.Uuid],
column_name: str,
description: str,
force: bool = False,
) -> Optional[T]:
"""Given an Entity ID, JSON PATCH the description of the column
Args
entity_id: ID
description: new description to add
column_name: column to update
force: if True, we will patch any existing description. Otherwise, we will maintain
the existing data.
Returns
Updated Entity
"""
table: Table = self._fetch_entity_if_exists(
entity=Table,
entity_id=entity_id,
)
if not table:
return None
if not table.columns:
return None
col_index, col = find_column_in_table_with_index(
column_name=column_name, table=table
)
if col_index is None:
logger.warning(f"Cannot find column {column_name} in Table.")
return None
if col.description and not force:
logger.warning(
f"The column '{column_name}' in '{table.fullyQualifiedName.__root__}' already has a description."
" To overwrite it, set `force` to True."
)
return None
try:
res = self.client.patch(
path=f"{self.get_suffix(Table)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.ADD
if not col.description
else PatchOperation.REPLACE,
PatchField.PATH: PatchPath.COLUMNS_DESCRIPTION.format(
index=col_index
),
PatchField.VALUE: description,
}
]
),
)
return Table(**res)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to PATCH description for Table Column: {entity_id}, {column_name}: {exc}"
)
return None
return self.patch(entity=entity, source=source, destination=destination)
def patch_table_constraints(
self,
entity_id: Union[str, basic.Uuid],
table_constraints: List[TableConstraint],
table: Table,
constraints: List[TableConstraint],
) -> Optional[T]:
"""Given an Entity ID, JSON PATCH the table constraints of table
Args
entity_id: ID
source_table: Origin table
description: new description to add
table_constraints: table constraints to add
Returns
Updated Entity
"""
table: Table = self._fetch_entity_if_exists(
entity=Table,
entity_id=entity_id,
instance: Table = self._fetch_entity_if_exists(
entity=Table, entity_id=table.id, fields=["tableConstraints"]
)
if not table:
if not instance:
return None
try:
res = self.client.patch(
path=f"{self.get_suffix(Table)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.ADD
if not table.tableConstraints
else PatchOperation.REPLACE,
PatchField.PATH: PatchPath.TABLE_CONSTRAINTS,
PatchField.VALUE: [
{
PatchValue.CONSTRAINT_TYPE: constraint.constraintType.value,
PatchValue.COLUMNS: constraint.columns,
PatchValue.REFERRED_COLUMNS: [
col.__root__
for col in constraint.referredColumns or []
],
}
for constraint in table_constraints
],
}
]
),
)
return Table(**res)
table.tableConstraints = instance.tableConstraints
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to PATCH description for Table Constraint: {entity_id}: {exc}"
)
destination = table.copy(deep=True)
destination.tableConstraints = constraints
return None
return self.patch(entity=Table, source=table, destination=destination)
def patch_tag(
self,
entity: Type[T],
entity_id: Union[str, basic.Uuid],
tag_fqn: str,
from_glossary: bool = False,
source: T,
tag_label: TagLabel,
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
@ -255,155 +222,35 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
Args
entity (T): Entity Type
entity_id: ID
description: new description to add
force: if True, we will patch any existing description. Otherwise, we will maintain
the existing data.
source: Source entity object
tag_label: TagLabel to add or remove
operation: Patch Operation to add or remove the tag.
Returns
Updated Entity
"""
instance = self._fetch_entity_if_exists(entity=entity, entity_id=entity_id)
instance: Optional[T] = self._fetch_entity_if_exists(
entity=entity, entity_id=source.id, fields=["tags"]
)
if not instance:
return None
tag_index = len(instance.tags) - 1 if instance.tags else 0
# Initialize empty tag list or the last updated tags
source.tags = instance.tags or []
destination = source.copy(deep=True)
try:
res = None
if operation == PatchOperation.ADD:
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.ADD,
PatchField.PATH: PatchPath.TAGS.format(
tag_index=tag_index
),
PatchField.VALUE: {
"labelType": LabelType.Automated.value,
"source": TagSource.Classification.value
if not from_glossary
else TagSource.Glossary.value,
"state": State.Confirmed.value,
"tagFQN": tag_fqn,
},
}
]
),
)
elif operation == PatchOperation.REMOVE:
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: PatchPath.TAGS.format(
tag_index=tag_index
),
}
]
),
)
return entity(**res) if res is not None else res
if operation == PatchOperation.REMOVE:
for tag in destination.tags:
if tag.tagFQN == tag_label.tagFQN:
destination.tags.remove(tag)
else:
destination.tags.append(tag_label)
except Exception as exc:
logger.error(traceback.format_exc())
logger.error(
f"Error trying to PATCH tag for {entity.__class__.__name__} [{entity_id}]: {exc}"
)
return None
def patch_column_tag(
self,
entity_id: Union[str, basic.Uuid],
column_name: str,
tag_fqn: str,
from_glossary: bool = False,
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
is_suggested: bool = False,
) -> Optional[T]:
"""Given an Entity ID, JSON PATCH the tag of the column
Args
entity_id: ID
tag_fqn: new tag to add
column_name: column to update
from_glossary: the tag comes from a glossary
Returns
Updated Entity
"""
table: Table = self._fetch_entity_if_exists(entity=Table, entity_id=entity_id)
if not table:
return None
col_index, col = find_column_in_table_with_index(
column_name=column_name, table=table
)
if col_index is None:
logger.warning(f"Cannot find column {column_name} in Table.")
return None
tag_index = len(col.tags) - 1 if col.tags else 0
try:
res = None
if operation == PatchOperation.ADD:
res = self.client.patch(
path=f"{self.get_suffix(Table)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.ADD,
PatchField.PATH: PatchPath.COLUMNS_TAGS.format(
index=col_index, tag_index=tag_index
),
PatchField.VALUE: {
PatchValue.LABEL_TYPE: LabelType.Automated.value,
PatchValue.SOURCE: TagSource.Classification.value
if not from_glossary
else TagSource.Glossary.value,
PatchValue.STATE: State.Suggested.value
if is_suggested
else State.Confirmed.value,
PatchValue.TAG_FQN: tag_fqn,
},
}
]
),
)
elif operation == PatchOperation.REMOVE:
res = self.client.patch(
path=f"{self.get_suffix(Table)}/{model_str(entity_id)}",
data=json.dumps(
[
{
PatchField.OPERATION: PatchOperation.REMOVE,
PatchField.PATH: PatchPath.COLUMNS_TAGS.format(
index=col_index, tag_index=tag_index
),
}
]
),
)
return Table(**res) if res is not None else res
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to PATCH tags for Table Column: {entity_id}, {column_name}: {exc}"
)
return None
return self.patch(entity=entity, source=source, destination=destination)
def patch_owner(
self,
entity: Type[T],
entity_id: Union[str, basic.Uuid],
source: T,
owner: EntityReference = None,
force: bool = False,
) -> Optional[T]:
@ -420,55 +267,108 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
Returns
Updated Entity
"""
instance = self._fetch_entity_if_exists(entity=entity, entity_id=entity_id)
instance: Optional[T] = self._fetch_entity_if_exists(
entity=entity, entity_id=source.id, fields=["owner"]
)
if not instance:
return None
# Don't change existing data without force
if instance.owner and not force:
logger.warning(
f"The entity with id [{model_str(entity_id)}] already has an owner."
f"The entity with id [{model_str(source.id)}] already has an owner."
" To overwrite it, set `overrideOwner` to True."
)
return None
data: Dict = {
PatchField.PATH: PatchPath.OWNER,
}
source.owner = instance.owner
if owner is None:
data[PatchField.OPERATION] = PatchOperation.REMOVE
else:
if owner.type not in OWNER_TYPES:
valid_owner_types: str = ", ".join(f'"{o}"' for o in OWNER_TYPES)
logger.error(
f"The entity with id [{model_str(entity_id)}] was provided an invalid"
f" owner type. Must be one of {valid_owner_types}."
)
return None
destination = source.copy(deep=True)
destination.owner = owner
data[PatchField.OPERATION] = (
PatchOperation.ADD if instance.owner is None else PatchOperation.REPLACE
)
data[PatchField.VALUE] = {
PatchValue.ID: model_str(owner.id),
PatchValue.TYPE: owner.type,
}
return self.patch(entity=entity, source=source, destination=destination)
try:
res = self.client.patch(
path=f"{self.get_suffix(entity)}/{model_str(entity_id)}",
data=json.dumps([data]),
)
return entity(**res)
def patch_column_tag(
self,
table: Table,
column_fqn: str,
tag_label: TagLabel,
operation: Union[
PatchOperation.ADD, PatchOperation.REMOVE
] = PatchOperation.ADD,
) -> Optional[T]:
"""Given an Entity ID, JSON PATCH the tag of the column
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to PATCH description for {entity.__class__.__name__} [{entity_id}]: {exc}"
Args
entity_id: ID
tag_label: TagLabel to add or remove
column_name: column to update
operation: Patch Operation to add or remove
Returns
Updated Entity
"""
instance: Optional[Table] = self._fetch_entity_if_exists(
entity=Table, entity_id=table.id, fields=["tags"]
)
if not instance:
return None
# Make sure we run the patch against the last updated data from the API
table.columns = instance.columns
destination = table.copy(deep=True)
update_column_tags(destination.columns, column_fqn, tag_label, operation)
patched_entity = self.patch(entity=Table, source=table, destination=destination)
if patched_entity is None:
logger.debug(
f"Empty PATCH result. Either everything is up to date or "
f"[{column_fqn}] not in [{table.fullyQualifiedName.__root__}]"
)
return None
return patched_entity
def patch_column_description(
self,
table: Table,
column_fqn: str,
description: str,
force: bool = False,
) -> Optional[T]:
"""Given an Table , Column FQN, JSON PATCH the description of the column
Args
src_table: origin Table object
column_fqn: FQN of the column to update
description: new description to add
force: if True, we will patch any existing description. Otherwise, we will maintain
the existing data.
Returns
Updated Entity
"""
instance: Optional[Table] = self._fetch_entity_if_exists(
entity=Table, entity_id=table.id
)
if not instance:
return None
# Make sure we run the patch against the last updated data from the API
table.columns = instance.columns
destination = table.copy(deep=True)
update_column_description(destination.columns, column_fqn, description, force)
patched_entity = self.patch(entity=Table, source=table, destination=destination)
if patched_entity is None:
logger.debug(
f"Empty PATCH result. Either everything is up to date or "
f"[{column_fqn}] not in [{table.fullyQualifiedName.__root__}]"
)
return patched_entity
def patch_automation_workflow_response(
self,

View File

@ -16,7 +16,7 @@ To be used be OpenMetadata
"""
from enum import Enum
from typing import Generic, Optional, Type, TypeVar, Union
from typing import Generic, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
@ -120,7 +120,10 @@ class OMetaPatchMixinBase(Generic[T]):
"""
def _fetch_entity_if_exists(
self, entity: Type[T], entity_id: Union[str, basic.Uuid]
self,
entity: Type[T],
entity_id: Union[str, basic.Uuid],
fields: Optional[List[str]] = None,
) -> Optional[T]:
"""
Validates if we can update a description or not. Will return
@ -129,11 +132,12 @@ class OMetaPatchMixinBase(Generic[T]):
Args
entity (T): Entity Type
entity_id: ID
fields: extra fields to fetch from API
Returns
instance to update
"""
instance = self.get_by_id(entity=entity, entity_id=entity_id, fields=["*"])
instance = self.get_by_id(entity=entity, entity_id=entity_id, fields=fields)
if not instance:
logger.warning(

View File

@ -147,7 +147,9 @@ class OMetaRolePolicyMixin(OMetaPatchMixinBase):
Returns
Updated Entity
"""
instance: Role = self._fetch_entity_if_exists(entity=Role, entity_id=entity_id)
instance: Role = self._fetch_entity_if_exists(
entity=Role, entity_id=entity_id, fields=["policies"]
)
if not instance:
return None

View File

@ -448,16 +448,16 @@ class MetadataRestSink(Sink[Entity]):
"""
try:
self.metadata.patch_table_constraints(
record.table_id,
record.constraints,
table=record.table,
constraints=record.constraints,
)
logger.debug(
f"Successfully ingested table constraints for table id {record.table_id}"
f"Successfully ingested table constraints for table id {record.table.id}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unexpected error while ingesting table constraints for table id [{record.table_id}]: {exc}"
f"Unexpected error while ingesting table constraints for table id [{record.table.id}]: {exc}"
)
def close(self):

View File

@ -351,7 +351,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
if owner and self.source_config.includeOwners:
self.metadata.patch_owner(
entity=Dashboard,
entity_id=self.context.dashboard.id,
source=self.context.dashboard,
owner=owner,
force=False,
)

View File

@ -415,7 +415,7 @@ class CommonDbSourceService(
OMetaTableConstraints(
foreign_constraints=foreign_columns,
constraints=table_constraints,
table_id=str(self.context.table.id.__root__),
table=self.context.table,
)
)

View File

@ -689,12 +689,17 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
)
if table_entity:
try:
service_name, database_name, schema_name, table_name = fqn.split(
table_entity.fullyQualifiedName.__root__
)
data_model = data_model_link.datamodel
# Patch table descriptions from DBT
if data_model.description:
self.metadata.patch_description(
entity=Table,
entity_id=table_entity.id,
source=table_entity,
description=data_model.description.__root__,
force=self.source_config.dbtUpdateDescriptions,
)
@ -703,15 +708,24 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
for column in data_model.columns:
if column.description:
self.metadata.patch_column_description(
entity_id=table_entity.id,
column_name=column.name.__root__,
table=table_entity,
column_fqn=fqn.build(
self.metadata,
entity_type=Column,
service_name=service_name,
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
column_name=column.name.__root__,
),
description=column.description.__root__,
force=self.source_config.dbtUpdateDescriptions,
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {table_entity.fullyQualifiedName.__root__}to update dbt desctiption: {exc}" # pylint: disable=line-too-long
f"Failed to parse the node {table_entity.fullyQualifiedName.__root__} "
f"to update dbt description: {exc}"
)
def create_dbt_tests_suite(

View File

@ -48,7 +48,12 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -178,7 +183,7 @@ class AtlasSource(Source):
if tpc_attrs.get("description") and topic_object:
self.metadata.patch_description(
entity=Topic,
entity_id=topic_object.id,
source=topic_object,
description=tpc_attrs["description"],
force=True,
)
@ -215,7 +220,7 @@ class AtlasSource(Source):
if db_entity.get("description", None) and database_object:
self.metadata.patch_description(
entity=Database,
entity_id=database_object.id,
source=database_object,
description=db_entity["description"],
force=True,
)
@ -234,7 +239,7 @@ class AtlasSource(Source):
if db_entity.get("description", None) and database_schema_object:
self.metadata.patch_description(
entity=DatabaseSchema,
entity_id=database_schema_object.id,
source=database_schema_object,
description=db_entity["description"],
force=True,
)
@ -257,7 +262,7 @@ class AtlasSource(Source):
if table_object:
if tbl_attrs.get("description", None):
self.metadata.patch_description(
entity_id=table_object.id,
source=table_object,
entity=Table,
description=tbl_attrs["description"],
force=True,
@ -270,8 +275,15 @@ class AtlasSource(Source):
tag_name=ATLAS_TABLE_TAG,
)
tag_label = TagLabel(
tagFQN=tag_fqn,
labelType=LabelType.Automated,
state=State.Suggested.value,
source=TagSource.Classification,
)
self.metadata.patch_tag(
entity=Table, entity_id=table_object.id, tag_fqn=tag_fqn
entity=Table, source=table_object, tag_label=tag_label
)
yield from self.ingest_lineage(tbl_entity["guid"], name)

View File

@ -16,6 +16,12 @@ from typing import Optional
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii import PII
from metadata.pii.column_name_scanner import ColumnNameScanner
@ -37,7 +43,7 @@ class PIIProcessor:
self.ner_scanner = NERScanner()
def patch_column_tag(
self, tag_type: str, table_entity: Table, column_name: str
self, tag_type: str, table_entity: Table, column_fqn: str
) -> None:
"""
Build the tag and run the PATCH
@ -48,11 +54,16 @@ class PIIProcessor:
classification_name=PII,
tag_name=tag_type,
)
tag_label = TagLabel(
tagFQN=tag_fqn,
source=TagSource.Classification,
state=State.Suggested,
labelType=LabelType.Automated,
)
self.metadata.patch_column_tag(
entity_id=table_entity.id,
column_name=column_name,
tag_fqn=tag_fqn,
is_suggested=True,
table=table_entity,
column_fqn=column_fqn,
tag_label=tag_label,
)
def process(
@ -96,7 +107,7 @@ class PIIProcessor:
self.patch_column_tag(
tag_type=tag_and_confidence.tag.value,
table_entity=table_entity,
column_name=table_entity.columns[idx].name.__root__,
column_fqn=column.fullyQualifiedName.__root__,
)
except Exception as err:
logger.warning(f"Error computing PII tags for [{column}] - [{err}]")

View File

@ -163,13 +163,13 @@ class DataInsightWorkflowTests(unittest.TestCase):
)
user: User = cls.metadata.get_by_name(User, "aaron_johnson0")
cls.metadata.patch_owner(
Table,
table.id,
EntityReference(
entity=Table,
source=table,
owner=EntityReference(
id=user.id,
type="user",
),
True,
force=True,
)
for event in WEB_EVENT_DATA:

View File

@ -14,7 +14,6 @@ OpenMetadata high-level API Table test
"""
import logging
import time
from typing import Union
from unittest import TestCase
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
@ -46,11 +45,30 @@ from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import find_column_in_table
PII_TAG_LABEL = TagLabel(
tagFQN="PII.Sensitive",
labelType=LabelType.Automated,
state=State.Suggested.value,
source=TagSource.Classification,
)
TIER_TAG_LABEL = TagLabel(
tagFQN="Tier.Tier2",
labelType=LabelType.Automated,
state=State.Suggested.value,
source=TagSource.Classification,
)
class OMetaTableTest(TestCase):
"""
@ -59,9 +77,9 @@ class OMetaTableTest(TestCase):
"""
service_entity_id = None
entity_id = None
db_entity_id: Union[str, basic.Uuid] = None
db_schema_entity_id: Union[str, basic.Uuid] = None
table: Table = None
db_entity: Database = None
db_schema_entity: DatabaseSchema = None
user_1: User = None
user_2: User = None
team_1: Team = None
@ -126,28 +144,25 @@ class OMetaTableTest(TestCase):
service=cls.service_entity.fullyQualifiedName,
)
create_db_entity = cls.metadata.create_or_update(data=create_db)
cls.db_entity_id = create_db_entity.id
cls.db_entity = cls.metadata.create_or_update(data=create_db)
create_schema = CreateDatabaseSchemaRequest(
name="test-schema",
database=create_db_entity.fullyQualifiedName,
database=cls.db_entity.fullyQualifiedName,
)
create_schema_entity = cls.metadata.create_or_update(data=create_schema)
cls.db_schema_entity_id = create_schema_entity.id
cls.db_schema_entity = cls.metadata.create_or_update(data=create_schema)
cls.create = CreateTableRequest(
name="test",
databaseSchema=create_schema_entity.fullyQualifiedName,
databaseSchema=cls.db_schema_entity.fullyQualifiedName,
columns=[
Column(name="id", dataType=DataType.BIGINT),
Column(name="another", dataType=DataType.BIGINT),
],
)
res: Table = cls.metadata.create_or_update(data=cls.create)
cls.entity_id = res.id
cls.table = cls.metadata.create_or_update(data=cls.create)
cls.user_1 = cls.metadata.create_or_update(
data=CreateUserRequest(
@ -229,20 +244,20 @@ class OMetaTableTest(TestCase):
Update description and force
"""
updated: Table = self.metadata.patch_description(
entity=Table, entity_id=self.entity_id, description="New description"
entity=Table, source=self.table, description="New description"
)
assert updated.description.__root__ == "New description"
not_updated = self.metadata.patch_description(
entity=Table, entity_id=self.entity_id, description="Not passing force"
entity=Table, source=self.table, description="Not passing force"
)
assert not not_updated
force_updated: Table = self.metadata.patch_description(
entity=Table,
entity_id=self.entity_id,
source=self.table,
description="Forced new",
force=True,
)
@ -255,26 +270,26 @@ class OMetaTableTest(TestCase):
"""
updated: Table = self.metadata.patch_column_description(
entity_id=self.entity_id,
table=self.table,
description="New column description",
column_name="another",
column_fqn=self.table.fullyQualifiedName.__root__ + ".another",
)
updated_col = find_column_in_table(column_name="another", table=updated)
assert updated_col.description.__root__ == "New column description"
not_updated = self.metadata.patch_column_description(
entity_id=self.entity_id,
table=self.table,
description="Not passing force",
column_name="another",
column_fqn=self.table.fullyQualifiedName.__root__ + ".another",
)
assert not not_updated
force_updated: Table = self.metadata.patch_column_description(
entity_id=self.entity_id,
table=self.table,
description="Forced new",
column_name="another",
column_fqn=self.table.fullyQualifiedName.__root__ + ".another",
force=True,
)
@ -285,18 +300,17 @@ class OMetaTableTest(TestCase):
"""
Update table tags
"""
updated: Table = self.metadata.patch_tag(
entity=Table,
entity_id=self.entity_id,
tag_fqn="PII.Sensitive", # Shipped by default
source=self.table,
tag_label=PII_TAG_LABEL, # Shipped by default
)
assert updated.tags[0].tagFQN.__root__ == "PII.Sensitive"
updated: Table = self.metadata.patch_tag(
entity=Table,
entity_id=self.entity_id,
tag_fqn="Tier.Tier2", # Shipped by default
source=self.table,
tag_label=TIER_TAG_LABEL, # Shipped by default
)
assert updated.tags[0].tagFQN.__root__ == "PII.Sensitive"
assert updated.tags[1].tagFQN.__root__ == "Tier.Tier2"
@ -306,18 +320,18 @@ class OMetaTableTest(TestCase):
Update column tags
"""
updated: Table = self.metadata.patch_column_tag(
entity_id=self.entity_id,
tag_fqn="PII.Sensitive", # Shipped by default
column_name="id",
table=self.table,
tag_label=PII_TAG_LABEL, # Shipped by default
column_fqn=self.table.fullyQualifiedName.__root__ + ".id",
)
updated_col = find_column_in_table(column_name="id", table=updated)
assert updated_col.tags[0].tagFQN.__root__ == "PII.Sensitive"
updated_again: Table = self.metadata.patch_column_tag(
entity_id=self.entity_id,
tag_fqn="Tier.Tier2", # Shipped by default
column_name="id",
table=self.table,
tag_label=TIER_TAG_LABEL, # Shipped by default
column_fqn=self.table.fullyQualifiedName.__root__ + ".id",
)
updated_again_col = find_column_in_table(column_name="id", table=updated_again)
@ -331,7 +345,7 @@ class OMetaTableTest(TestCase):
# Database, no existing owner, owner is a User -> Modified
updated: Database = self.metadata.patch_owner(
entity=Database,
entity_id=self.db_entity_id,
source=self.db_entity,
owner=self.owner_user_1,
)
assert updated is not None
@ -340,7 +354,7 @@ class OMetaTableTest(TestCase):
# Database, existing owner, owner is a User, no force -> Unmodified
updated: Database = self.metadata.patch_owner(
entity=Database,
entity_id=self.db_entity_id,
source=self.db_entity,
owner=self.owner_user_2,
)
assert updated is None
@ -348,7 +362,7 @@ class OMetaTableTest(TestCase):
# Database, existing owner, owner is a User, force -> Modified
updated: Database = self.metadata.patch_owner(
entity=Database,
entity_id=self.db_entity_id,
source=self.db_entity,
owner=self.owner_user_2,
force=True,
)
@ -358,14 +372,14 @@ class OMetaTableTest(TestCase):
# Database, existing owner, no owner, no force -> Unmodified
updated: Database = self.metadata.patch_owner(
entity=Database,
entity_id=self.db_entity_id,
source=self.db_entity,
)
assert updated is None
# Database, existing owner, no owner, force -> Modified
updated: Database = self.metadata.patch_owner(
entity=Database,
entity_id=self.db_entity_id,
source=self.db_entity,
force=True,
)
assert updated is not None
@ -374,7 +388,7 @@ class OMetaTableTest(TestCase):
# DatabaseSchema, no existing owner, owner is Team -> Modified
updated: DatabaseSchema = self.metadata.patch_owner(
entity=DatabaseSchema,
entity_id=self.db_schema_entity_id,
source=self.db_schema_entity,
owner=self.owner_team_1,
)
assert updated is not None
@ -383,7 +397,7 @@ class OMetaTableTest(TestCase):
# DatabaseSchema, existing owner, owner is Team, no force -> Unmodified
updated: DatabaseSchema = self.metadata.patch_owner(
entity=DatabaseSchema,
entity_id=self.db_schema_entity_id,
source=self.db_schema_entity,
owner=self.owner_team_2,
)
assert updated is None
@ -391,7 +405,7 @@ class OMetaTableTest(TestCase):
# DatabaseSchema, existing owner, owner is Team, force -> Modified
updated: DatabaseSchema = self.metadata.patch_owner(
entity=DatabaseSchema,
entity_id=self.db_schema_entity_id,
source=self.db_schema_entity,
owner=self.owner_team_2,
force=True,
)
@ -401,14 +415,14 @@ class OMetaTableTest(TestCase):
# DatabaseSchema, existing owner, no owner, no force -> Unmodified
updated: DatabaseSchema = self.metadata.patch_owner(
entity=DatabaseSchema,
entity_id=self.db_schema_entity_id,
source=self.db_schema_entity,
)
assert updated is None
# DatabaseSchema, existing owner, no owner, force -> Modified
updated: DatabaseSchema = self.metadata.patch_owner(
entity=DatabaseSchema,
entity_id=self.db_schema_entity_id,
source=self.db_schema_entity,
force=True,
)
assert updated is not None
@ -417,7 +431,7 @@ class OMetaTableTest(TestCase):
# Table, no existing owner, owner is a Team -> Modified
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id=self.entity_id,
source=self.table,
owner=self.owner_team_1,
)
assert updated is not None
@ -426,7 +440,7 @@ class OMetaTableTest(TestCase):
# Table, existing owner, owner is a Team, no force -> Unmodified
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id=self.entity_id,
source=self.table,
owner=self.owner_team_2,
)
assert updated is None
@ -434,7 +448,7 @@ class OMetaTableTest(TestCase):
# Table, existing owner, owner is a Team, force -> Modified
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id=self.entity_id,
source=self.table,
owner=self.owner_team_2,
force=True,
)
@ -444,32 +458,77 @@ class OMetaTableTest(TestCase):
# Table, existing owner, no owner, no force -> Unmodified
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id=self.entity_id,
source=self.table,
)
assert updated is None
# Table, existing owner, no owner, no force -> Modified
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id=self.entity_id,
source=self.table,
force=True,
)
assert updated is not None
assert updated.owner is None
# Table with non-existent id, force -> Unmodified
non_existent_table = self.table.copy(deep=True)
non_existent_table.id = "9facb7b3-1dee-4017-8fca-1254b700afef"
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id="9facb7b3-1dee-4017-8fca-1254b700afef",
source=non_existent_table,
force=True,
)
assert updated is None
# Table, no owner, invalid owner type -> Unmodified
updated: Table = self.metadata.patch_owner(
entity=Table,
entity_id=self.entity_id,
owner=EntityReference(id=self.entity_id, type="table"),
force=True,
# Enable after https://github.com/open-metadata/OpenMetadata/issues/11715
# updated: Table = self.metadata.patch_owner(
# entity=Table,
# source=self.table,
# owner=EntityReference(id=self.table.id, type="table"),
# force=True,
# )
# assert updated is None
def test_patch_nested_col(self):
"""
create a table with nested cols and run patch on it
"""
create = CreateTableRequest(
name="test",
databaseSchema=self.db_schema_entity.fullyQualifiedName,
columns=[
Column(
name="struct",
dataType=DataType.STRUCT,
children=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.STRING),
],
)
],
)
created: Table = self.metadata.create_or_update(create)
with_tags: Table = self.metadata.patch_column_tag(
table=created,
column_fqn=created.fullyQualifiedName.__root__ + ".struct.id",
tag_label=TIER_TAG_LABEL,
)
self.assertEqual(
with_tags.columns[0].children[0].tags[0].tagFQN.__root__,
TIER_TAG_LABEL.tagFQN.__root__,
)
with_description: Table = self.metadata.patch_column_description(
table=created,
column_fqn=created.fullyQualifiedName.__root__ + ".struct.name",
description="I am so nested",
)
self.assertEqual(
with_description.columns[0].children[1].description.__root__,
"I am so nested",
)
assert updated is None

View File

@ -283,7 +283,7 @@ EXPTECTED_TABLE = Table(
description="test tag",
source="Classification",
labelType="Automated",
state="Confirmed",
state="Suggested",
href=None,
)
],