From 9adaec1b3ab58b9147d5a3d8aae1050897238ca5 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 27 Jul 2022 12:11:55 +0530 Subject: [PATCH] Fix #6248: Use PUT API instead of POST for tag & tagCategory (#6344) * Fix #6248: Use PUT API instead of POST for tag & tagCategory * Restructure yield_tag topology * Renamed method to create_or_update --- .../metadata/ingestion/ometa/mixins/tag_mixin.py | 4 ++-- .../src/metadata/ingestion/sink/metadata_rest.py | 15 +++++++++++++-- .../ingestion/source/database/bigquery.py | 2 -- .../ingestion/source/database/database_service.py | 9 ++++++++- .../integration/ometa/test_ometa_tags_mixin.py | 4 ++-- 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/tag_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/tag_mixin.py index 63898b787d5..98a76438c89 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/tag_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/tag_mixin.py @@ -75,7 +75,7 @@ class OMetaTagMixin: path = f"{category_name}" return self._get(entity=entity, path=path, fields=fields) - def update_tag_category( + def create_or_update_tag_category( self, category_name: str, tag_category_body: CreateTagCategoryRequest ) -> None: """Method to update a tag category @@ -117,7 +117,7 @@ class OMetaTagMixin: path = f"{category_name}/{primary_tag_fqn}" return self._get(entity=entity, path=path, fields=fields) - def update_primary_tag( + def create_or_update_primary_tag( self, category_name: str, primary_tag_fqn: str, diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index cfecf57f6f7..35b589c5aee 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -40,6 +40,7 @@ from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.entity.teams.role import Role from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.type.entityReference import EntityReference @@ -56,6 +57,7 @@ from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.utils import fqn from metadata.utils.logger import ingestion_logger from metadata.utils.sql_lineage import ( _create_lineage_by_table_name, @@ -461,14 +463,23 @@ class MetadataRestSink(Sink[Entity]): def write_tag_category(self, record: OMetaTagAndCategory): try: - self.metadata.create_tag_category(tag_category_body=record.category_name) + self.metadata.create_or_update_tag_category( + tag_category_body=record.category_name, + category_name=record.category_name.name.__root__, + ) except Exception as err: logger.debug(traceback.format_exc()) logger.error(err) try: - self.metadata.create_primary_tag( + self.metadata.create_or_update_primary_tag( category_name=record.category_name.name.__root__, primary_tag_body=record.category_details, + primary_tag_fqn=fqn.build( + metadata=self.metadata, + entity_type=Tag, + tag_category_name=record.category_name.name.__root__, + tag_name=record.category_details.name.__root__, + ), ) except Exception as err: logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery.py b/ingestion/src/metadata/ingestion/source/database/bigquery.py index 948003526e5..475b98e1421 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery.py @@ -123,8 +123,6 @@ class BigquerySource(CommonDbSourceService): :param _: :return: """ - if not self.source_config.includeTags: - return taxonomies = PolicyTagManagerClient().list_taxonomies( parent=f"projects/{self.project_id}/locations/{self.service_connection.taxonomyLocation}" ) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index c6ceeb2c85b..653b6dbf98e 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -125,7 +125,7 @@ class DatabaseServiceTopology(ServiceTopology): NodeStage( type_=OMetaTagAndCategory, context="tags", - processor="yield_tag", + processor="yield_tag_details", ack_sink=False, nullable=True, cache_all=True, @@ -265,6 +265,13 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC): From topology. To be run for each schema """ + def yield_tag_details(self, schema_name: str) -> Iterable[OMetaTagAndCategory]: + """ + From topology. To be run for each schema + """ + if self.source_config.includeTags: + yield from self.yield_tag(schema_name) or [] + @abstractmethod def yield_view_lineage( self, table_name_and_type: Tuple[str, TableType] diff --git a/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py b/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py index 9f96234e934..0415b2f631b 100644 --- a/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py +++ b/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py @@ -126,7 +126,7 @@ class OMetaTagMixinPut(TestCase): categoryType="Descriptive", description="test tag", name=f"{rand_name}" ) - self.metadata.update_tag_category(CATEGORY_NAME, updated_tag_category) + self.metadata.create_or_update_tag_category(CATEGORY_NAME, updated_tag_category) assert True @@ -138,7 +138,7 @@ class OMetaTagMixinPut(TestCase): description="test tag", name=f"{rand_name}" ) - self.metadata.update_primary_tag( + self.metadata.create_or_update_primary_tag( CATEGORY_NAME, PRIMARY_TAG_NAME, updated_primary_tag )