Fixes: Ingest System Tags (#20432)

This commit is contained in:
Keshav Mohta 2025-03-27 20:00:33 +05:30 committed by GitHub
parent 11af2b44eb
commit 3c17a3025c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 91 additions and 31 deletions

View File

@ -431,6 +431,8 @@ class BigquerySource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
tag_description="Bigquery Dataset Label",
classification_description="BigQuery Dataset Classification",
include_tags=self.source_config.includeTags,
metadata=self.metadata,
system_tags=True,
)
# Fetching policy tags on the column level
list_project_ids = [self.context.get().database]
@ -451,6 +453,8 @@ class BigquerySource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
tag_description="Bigquery Policy Tag",
classification_description="BigQuery Policy Classification",
include_tags=self.source_config.includeTags,
metadata=self.metadata,
system_tags=True,
)
except Exception as exc:
yield Either(
@ -593,6 +597,8 @@ class BigquerySource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
tag_description="Bigquery Table Label",
classification_description="BigQuery Table Classification",
include_tags=self.source_config.includeTags,
metadata=self.metadata,
system_tags=True,
)
def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]:

View File

@ -639,6 +639,8 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
classification_name=tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
metadata=self.metadata,
system_tags=True,
)
except Exception as exc:
@ -673,6 +675,8 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
classification_name=tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
metadata=self.metadata,
system_tags=True,
)
except Exception as exc:
@ -711,6 +715,8 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
classification_name=tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
metadata=self.metadata,
system_tags=True,
)
column_tags = self.column_tags.get(
@ -737,6 +743,8 @@ class DatabricksSource(ExternalTableLineageMixin, CommonDbSourceService, MultiDB
classification_name=tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
metadata=self.metadata,
system_tags=True,
)
except Exception as exc:

View File

@ -17,6 +17,9 @@ from typing import List, Optional, TypeVar
from pydantic import BaseModel
from metadata.generated.schema.analytics.reportData import ReportData
from metadata.generated.schema.entity.classification.classification import (
Classification,
)
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.apiCollection import APICollection
from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint
@ -66,6 +69,7 @@ ES_INDEX_MAP = {
GlossaryTerm.__name__: "glossary_term_search_index",
MlModel.__name__: "mlmodel_search_index",
Tag.__name__: "tag_search_index",
Classification.__name__: "classification_search_index",
Container.__name__: "container_search_index",
Query.__name__: "query_search_index",
ReportData.__name__: "entity_report_data_index",

View File

@ -20,6 +20,9 @@ from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.entity.classification.classification import (
Classification,
)
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
@ -29,6 +32,7 @@ from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Markdown,
ProviderType,
)
from metadata.generated.schema.type.tagLabel import (
LabelType,
@ -46,6 +50,7 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
# pylint: disable=too-many-arguments
def get_ometa_tag_and_classification(
tags: List[str],
classification_name: str,
@ -53,42 +58,79 @@ def get_ometa_tag_and_classification(
classification_description: str,
include_tags: bool = True,
tag_fqn: Optional[FullyQualifiedEntityName] = None,
metadata: Optional[OpenMetadata] = None,
system_tags: bool = False,
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Returns the OMetaTagAndClassification object
"""
if include_tags:
for tag in tags:
if tag:
try:
classification = OMetaTagAndClassification(
fqn=tag_fqn,
classification_request=CreateClassificationRequest(
name=EntityName(classification_name),
description=Markdown(classification_description),
),
tag_request=CreateTagRequest(
classification=FullyQualifiedEntityName(
classification_name
),
name=EntityName(tag),
description=Markdown(tag_description)
if tag_description
else None,
),
)
yield Either(right=classification)
logger.debug(
f"Classification {classification_name}, Tag {tag} Ingested"
)
except Exception as err:
yield Either(
left=StackTraceError(
name=tag,
error=f"Error yielding tag [{tag}]: [{err}]",
stackTrace=traceback.format_exc(),
)
if not include_tags:
return
if system_tags:
# Checking for system classification
for classification_entity in (
metadata.es_search_from_fqn(
entity_type=Classification, fqn_search_string=classification_name
)
or []
):
if (
classification_entity.provider == ProviderType.system
and classification_entity.name.root.lower()
== classification_name.lower()
):
classification_name = classification_entity.name.root
classification_description = classification_entity.description.root
break
for tag in tags:
specific_tag_description = tag_description
try:
if system_tags:
# Checking for system tag
for tag_entity in (
metadata.es_search_from_fqn(
entity_type=Tag,
fqn_search_string=f"{classification_name}.{tag}",
)
or []
):
if (
tag_entity.provider == ProviderType.system
and tag_entity.classification.name == classification_name
and tag_entity.name.root.lower() == tag.lower()
):
tag = tag_entity.name.root
specific_tag_description = tag_entity.description.root
break
classification = OMetaTagAndClassification(
fqn=tag_fqn,
classification_request=CreateClassificationRequest(
name=EntityName(classification_name),
description=Markdown(classification_description),
),
tag_request=CreateTagRequest(
classification=FullyQualifiedEntityName(classification_name),
name=EntityName(tag),
description=(
Markdown(specific_tag_description)
if specific_tag_description
else None
),
),
)
yield Either(right=classification)
logger.debug(f"Classification {classification_name}, Tag {tag} Ingested")
except Exception as err:
yield Either(
left=StackTraceError(
name=tag,
error=f"Error yielding tag [{tag}]: [{err}]",
stackTrace=traceback.format_exc(),
)
)
@functools.lru_cache(maxsize=512)