Fix #6248: Use PUT API instead of POST for tag & tagCategory (#6344)

* Fix #6248: Use PUT API instead of POST for tag & tagCategory

* Restructure yield_tag topology

* Renamed method to create_or_update
This commit is contained in:
Mayur Singal 2022-07-27 12:11:55 +05:30 committed by GitHub
parent 53930a9253
commit 9adaec1b3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 9 deletions

View File

@ -75,7 +75,7 @@ class OMetaTagMixin:
path = f"{category_name}"
return self._get(entity=entity, path=path, fields=fields)
def update_tag_category(
def create_or_update_tag_category(
self, category_name: str, tag_category_body: CreateTagCategoryRequest
) -> None:
"""Method to update a tag category
@ -117,7 +117,7 @@ class OMetaTagMixin:
path = f"{category_name}/{primary_tag_fqn}"
return self._get(entity=entity, path=path, fields=fields)
def update_primary_tag(
def create_or_update_primary_tag(
self,
category_name: str,
primary_tag_fqn: str,

View File

@ -40,6 +40,7 @@ from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.type.entityReference import EntityReference
@ -56,6 +57,7 @@ from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import (
_create_lineage_by_table_name,
@ -461,14 +463,23 @@ class MetadataRestSink(Sink[Entity]):
def write_tag_category(self, record: OMetaTagAndCategory):
try:
self.metadata.create_tag_category(tag_category_body=record.category_name)
self.metadata.create_or_update_tag_category(
tag_category_body=record.category_name,
category_name=record.category_name.name.__root__,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
try:
self.metadata.create_primary_tag(
self.metadata.create_or_update_primary_tag(
category_name=record.category_name.name.__root__,
primary_tag_body=record.category_details,
primary_tag_fqn=fqn.build(
metadata=self.metadata,
entity_type=Tag,
tag_category_name=record.category_name.name.__root__,
tag_name=record.category_details.name.__root__,
),
)
except Exception as err:
logger.debug(traceback.format_exc())

View File

@ -123,8 +123,6 @@ class BigquerySource(CommonDbSourceService):
:param _:
:return:
"""
if not self.source_config.includeTags:
return
taxonomies = PolicyTagManagerClient().list_taxonomies(
parent=f"projects/{self.project_id}/locations/{self.service_connection.taxonomyLocation}"
)

View File

@ -125,7 +125,7 @@ class DatabaseServiceTopology(ServiceTopology):
NodeStage(
type_=OMetaTagAndCategory,
context="tags",
processor="yield_tag",
processor="yield_tag_details",
ack_sink=False,
nullable=True,
cache_all=True,
@ -265,6 +265,13 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
From topology. To be run for each schema
"""
def yield_tag_details(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
"""
From topology. To be run for each schema
"""
if self.source_config.includeTags:
yield from self.yield_tag(schema_name) or []
@abstractmethod
def yield_view_lineage(
self, table_name_and_type: Tuple[str, TableType]

View File

@ -126,7 +126,7 @@ class OMetaTagMixinPut(TestCase):
categoryType="Descriptive", description="test tag", name=f"{rand_name}"
)
self.metadata.update_tag_category(CATEGORY_NAME, updated_tag_category)
self.metadata.create_or_update_tag_category(CATEGORY_NAME, updated_tag_category)
assert True
@ -138,7 +138,7 @@ class OMetaTagMixinPut(TestCase):
description="test tag", name=f"{rand_name}"
)
self.metadata.update_primary_tag(
self.metadata.create_or_update_primary_tag(
CATEGORY_NAME, PRIMARY_TAG_NAME, updated_primary_tag
)