From cff403a05a72dce5d461759bd9b9ba800d8676ad Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Thu, 11 May 2023 21:34:55 +0530 Subject: [PATCH] Validate if tags are created before attaching them to CreateRequest (#11554) * Added tags validation * typo fixed --- .../ingestion/source/database/dbt/metadata.py | 18 ++++--- ingestion/src/metadata/utils/tag_utils.py | 54 ++++++++++++------- ingestion/tests/unit/test_dbt.py | 41 +++++++++++++- .../unit/topology/pipeline/test_dagster.py | 16 +++++- 4 files changed, 99 insertions(+), 30 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 43d9bc05c10..9e77f203044 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -25,6 +25,7 @@ from metadata.generated.schema.api.tests.createTestDefinition import ( CreateTestDefinitionRequest, ) from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.table import ( Column, DataModel, @@ -322,12 +323,15 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ) try: # Create all the tags added - dbt_tag_labels = tag_utils.get_tag_labels( - metadata=self.metadata, - tags=dbt_tags_list, - classification_name=self.tag_classification_name, - include_tags=self.source_config.includeTags, - ) + dbt_tag_labels = [ + fqn.build( + self.metadata, + Tag, + classification_name=self.tag_classification_name, + tag_name=tag_name, + ) + for tag_name in dbt_tags_list + ] for tag_label in dbt_tag_labels or []: yield OMetaTagAndClassification( classification_request=CreateClassificationRequest( @@ -336,7 +340,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ), tag_request=CreateTagRequest( classification=self.tag_classification_name, - name=tag_label.tagFQN.__root__.split(fqn.FQN_SEPARATOR)[1], + name=tag_label.split(fqn.FQN_SEPARATOR)[1], description="dbt Tags", ), ) diff --git a/ingestion/src/metadata/utils/tag_utils.py b/ingestion/src/metadata/utils/tag_utils.py index 89725d58830..ba65a136a9d 100644 --- a/ingestion/src/metadata/utils/tag_utils.py +++ b/ingestion/src/metadata/utils/tag_utils.py @@ -14,6 +14,7 @@ Tag utils Module """ +import functools import traceback from typing import List, Optional @@ -31,6 +32,23 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +@functools.lru_cache(maxsize=512) +def _get_tag_label(metadata: OpenMetadata, tag_fqn: str) -> Optional[TagLabel]: + """ + Returns the tag label if the tag is created + """ + # Check if the tag exists + tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn) + if tag: + return TagLabel( + tagFQN=tag_fqn, + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ) + return None + + def get_tag_labels( metadata: OpenMetadata, tags: List[str], @@ -38,25 +56,23 @@ def get_tag_labels( include_tags: bool = True, ) -> Optional[List[TagLabel]]: """ - Mehthod to create tag labels from the collected tags + Method to create tag labels from the collected tags """ + tag_labels_list = [] if tags and include_tags: - try: - return [ - TagLabel( - tagFQN=fqn.build( - metadata, - Tag, - classification_name=classification_name, - tag_name=tag, - ), - labelType=LabelType.Automated.value, - state=State.Suggested.value, - source=TagSource.Classification.value, + for tag in tags: + try: + tag_fqn = fqn.build( + metadata, + Tag, + classification_name=classification_name, + tag_name=tag, ) - for tag in tags - ] - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(f"Error processing tag labels: {err}") - return None + tag_label = _get_tag_label(metadata, tag_fqn=tag_fqn) + if tag_label: + tag_labels_list.append(tag_label) + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error processing tag labels: {err}") + return tag_labels_list or None diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index f8c18e1011c..9e8cfe4249a 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -296,9 +296,24 @@ class DbtUnitTest(TestCase): @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") - def test_dbt_manifest_v8(self, es_search_from_fqn, get_dbt_owner): + @patch("metadata.utils.tag_utils._get_tag_label") + def test_dbt_manifest_v8(self, _get_tag_label, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES + _get_tag_label.side_effect = [ + TagLabel( + tagFQN="dbtTags.model_tag_one", + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ), + TagLabel( + tagFQN="dbtTags.model_tag_two", + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ), + ] self.execute_test( MOCK_SAMPLE_MANIFEST_V8, expected_records=2, @@ -324,7 +339,29 @@ class DbtUnitTest(TestCase): self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null")) self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev")) - def test_dbt_get_dbt_tag_labels(self): + @patch("metadata.utils.tag_utils._get_tag_label") + def test_dbt_get_dbt_tag_labels(self, _get_tag_label): + _get_tag_label.side_effect = [ + TagLabel( + tagFQN="dbtTags.tag1", + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ), + TagLabel( + tagFQN='dbtTags."tag2.name"', + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ), + TagLabel( + tagFQN="dbtTags.tag3", + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ), + ] + mocked_metadata = MagicMock() result = tag_utils.get_tag_labels( metadata=mocked_metadata, diff --git a/ingestion/tests/unit/topology/pipeline/test_dagster.py b/ingestion/tests/unit/topology/pipeline/test_dagster.py index 9700f739c8a..f9238e5cdb4 100644 --- a/ingestion/tests/unit/topology/pipeline/test_dagster.py +++ b/ingestion/tests/unit/topology/pipeline/test_dagster.py @@ -34,7 +34,12 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.tagLabel import TagLabel +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagLabel, + TagSource, +) from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.source.pipeline.dagster.metadata import DagsterSource @@ -270,9 +275,16 @@ class DagsterUnitTest(TestCase): ) @patch("metadata.ingestion.source.pipeline.dagster.metadata.DagsterSource.get_jobs") - def test_yield_pipeline(self, get_jobs): + @patch("metadata.utils.tag_utils._get_tag_label") + def test_yield_pipeline(self, _get_tag_label, get_jobs): results = self.dagster.yield_pipeline(EXPECTED_DAGSTER_DETAILS) get_jobs.return_value = EXPECTED_DAGSTER_DETAILS + _get_tag_label.return_value = TagLabel( + tagFQN="DagsterTags.hacker_new_repository", + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ) pipelines_list = [] for result in results: pipelines_list.append(result)