Validate if tags are created before attaching them to CreateRequest (#11554)

* Added tags validation

* typo fixed
This commit is contained in:
Onkar Ravgan 2023-05-11 21:34:55 +05:30 committed by GitHub
parent ef7b02529d
commit cff403a05a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 30 deletions

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
@ -322,12 +323,15 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
)
try:
# Create all the tags added
dbt_tag_labels = tag_utils.get_tag_labels(
metadata=self.metadata,
tags=dbt_tags_list,
classification_name=self.tag_classification_name,
include_tags=self.source_config.includeTags,
)
dbt_tag_labels = [
fqn.build(
self.metadata,
Tag,
classification_name=self.tag_classification_name,
tag_name=tag_name,
)
for tag_name in dbt_tags_list
]
for tag_label in dbt_tag_labels or []:
yield OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
@ -336,7 +340,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
),
tag_request=CreateTagRequest(
classification=self.tag_classification_name,
name=tag_label.tagFQN.__root__.split(fqn.FQN_SEPARATOR)[1],
name=tag_label.split(fqn.FQN_SEPARATOR)[1],
description="dbt Tags",
),
)

View File

@ -14,6 +14,7 @@
Tag utils Module
"""
import functools
import traceback
from typing import List, Optional
@ -31,6 +32,23 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@functools.lru_cache(maxsize=512)
def _get_tag_label(metadata: OpenMetadata, tag_fqn: str) -> Optional[TagLabel]:
"""
Returns the tag label if the tag is created
"""
# Check if the tag exists
tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn)
if tag:
return TagLabel(
tagFQN=tag_fqn,
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
)
return None
def get_tag_labels(
metadata: OpenMetadata,
tags: List[str],
@ -38,25 +56,23 @@ def get_tag_labels(
include_tags: bool = True,
) -> Optional[List[TagLabel]]:
"""
Mehthod to create tag labels from the collected tags
Method to create tag labels from the collected tags
"""
tag_labels_list = []
if tags and include_tags:
try:
return [
TagLabel(
tagFQN=fqn.build(
metadata,
Tag,
classification_name=classification_name,
tag_name=tag,
),
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
for tag in tags:
try:
tag_fqn = fqn.build(
metadata,
Tag,
classification_name=classification_name,
tag_name=tag,
)
for tag in tags
]
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error processing tag labels: {err}")
return None
tag_label = _get_tag_label(metadata, tag_fqn=tag_fqn)
if tag_label:
tag_labels_list.append(tag_label)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error processing tag labels: {err}")
return tag_labels_list or None

View File

@ -296,9 +296,24 @@ class DbtUnitTest(TestCase):
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
def test_dbt_manifest_v8(self, es_search_from_fqn, get_dbt_owner):
@patch("metadata.utils.tag_utils._get_tag_label")
def test_dbt_manifest_v8(self, _get_tag_label, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES
_get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.model_tag_one",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN="dbtTags.model_tag_two",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
]
self.execute_test(
MOCK_SAMPLE_MANIFEST_V8,
expected_records=2,
@ -324,7 +339,29 @@ class DbtUnitTest(TestCase):
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null"))
self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev"))
def test_dbt_get_dbt_tag_labels(self):
@patch("metadata.utils.tag_utils._get_tag_label")
def test_dbt_get_dbt_tag_labels(self, _get_tag_label):
_get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.tag1",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN='dbtTags."tag2.name"',
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
TagLabel(
tagFQN="dbtTags.tag3",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
),
]
mocked_metadata = MagicMock()
result = tag_utils.get_tag_labels(
metadata=mocked_metadata,

View File

@ -34,7 +34,12 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.dagster.metadata import DagsterSource
@ -270,9 +275,16 @@ class DagsterUnitTest(TestCase):
)
@patch("metadata.ingestion.source.pipeline.dagster.metadata.DagsterSource.get_jobs")
def test_yield_pipeline(self, get_jobs):
@patch("metadata.utils.tag_utils._get_tag_label")
def test_yield_pipeline(self, _get_tag_label, get_jobs):
results = self.dagster.yield_pipeline(EXPECTED_DAGSTER_DETAILS)
get_jobs.return_value = EXPECTED_DAGSTER_DETAILS
_get_tag_label.return_value = TagLabel(
tagFQN="DagsterTags.hacker_new_repository",
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
)
pipelines_list = []
for result in results:
pipelines_list.append(result)