Centralize tags ingestion logic (#11880)

This commit is contained in:
Onkar Ravgan 2023-06-09 10:45:53 +05:30 committed by GitHub
parent f43aaf4150
commit caabe89f9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 231 additions and 377 deletions

View File

@ -16,10 +16,6 @@ from typing import Iterable, List, Optional
from packaging import version
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -42,10 +38,11 @@ from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn, tag_utils
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import clean_uri, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -94,27 +91,13 @@ class RedashSource(DashboardServiceSource):
"""
Fetch Dashboard Tags
"""
if self.source_config.includeTags:
for tag in self.tags:
try:
classification = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=REDASH_TAG_CATEGORY,
description="Tags associates with redash entities",
),
tag_request=CreateTagRequest(
classification=REDASH_TAG_CATEGORY,
name=tag,
description="Redash Tag",
),
)
yield classification
logger.info(
f"Classification {REDASH_TAG_CATEGORY}, Tag {tag} Ingested"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error ingesting tag {tag}: {exc}")
yield from get_ometa_tag_and_classification(
tags=self.tags,
classification_name=REDASH_TAG_CATEGORY,
tag_description="Redash Tag",
classification_desciption="Tags associated with redash entities",
include_tags=self.source_config.includeTags,
)
def get_dashboards_list(self) -> Optional[List[dict]]:
"""
@ -192,7 +175,7 @@ class RedashSource(DashboardServiceSource):
],
service=self.context.dashboard_service.fullyQualifiedName.__root__,
dashboardUrl=self.get_dashboard_url(dashboard_details),
tags=tag_utils.get_tag_labels(
tags=get_tag_labels(
metadata=self.metadata,
tags=dashboard_details.get("tags"),
classification_name=REDASH_TAG_CATEGORY,

View File

@ -14,10 +14,6 @@ Tableau source module
import traceback
from typing import Iterable, List, Optional, Set
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDashboardDataModel import (
@ -56,10 +52,11 @@ from metadata.ingestion.source.dashboard.tableau.models import (
UpstreamTable,
)
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn, tag_utils
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart, filter_by_datamodel
from metadata.utils.helpers import clean_uri, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -172,27 +169,13 @@ class TableauSource(DashboardServiceSource):
"""
Fetch Dashboard Tags
"""
if self.source_config.includeTags:
for tag in self.tags:
try:
classification = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=TABLEAU_TAG_CATEGORY,
description="Tags associates with tableau entities",
),
tag_request=CreateTagRequest(
classification=TABLEAU_TAG_CATEGORY,
name=tag.label,
description="Tableau Tag",
),
)
yield classification
logger.info(
f"Classification {TABLEAU_TAG_CATEGORY}, Tag {tag} Ingested"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error ingesting tag [{tag}]: {err}")
yield from get_ometa_tag_and_classification(
tags=[tag.label for tag in self.tags],
classification_name=TABLEAU_TAG_CATEGORY,
tag_description="Tableau Tag",
classification_desciption="Tags associated with tableau entities",
include_tags=self.source_config.includeTags,
)
def yield_datamodel(
self, dashboard_details: TableauDashboard
@ -263,7 +246,7 @@ class TableauSource(DashboardServiceSource):
)
for data_model in self.context.dataModels or []
],
tags=tag_utils.get_tag_labels(
tags=get_tag_labels(
metadata=self.metadata,
tags=[tag.label for tag in dashboard_details.tags],
classification_name=TABLEAU_TAG_CATEGORY,
@ -368,7 +351,7 @@ class TableauSource(DashboardServiceSource):
displayName=chart.name,
chartType=get_standard_chart_type(chart.sheetType),
chartUrl=chart_url,
tags=tag_utils.get_tag_labels(
tags=get_tag_labels(
metadata=self.metadata,
tags=[tag.label for tag in chart.tags],
classification_name=TABLEAU_TAG_CATEGORY,

View File

@ -25,14 +25,9 @@ from sqlalchemy.types import String
from sqlalchemy_bigquery import BigQueryDialect, _types
from sqlalchemy_bigquery._types import _get_sqla_column_type
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
IntervalType,
@ -56,12 +51,7 @@ from metadata.generated.schema.security.credentials.gcpValues import (
MultipleProjectId,
SingleProjectId,
)
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.connections import get_connection
@ -74,6 +64,11 @@ from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import is_complex_type
from metadata.utils.tag_utils import (
get_ometa_tag_and_classification,
get_tag_label,
get_tag_labels,
)
class BQJSON(String):
@ -214,16 +209,11 @@ class BigquerySource(CommonDbSourceService):
dataset_obj = self.client.get_dataset(schema_name)
if dataset_obj.labels:
for key, value in dataset_obj.labels.items():
yield OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=key,
description="",
),
tag_request=CreateTagRequest(
classification=key,
name=value,
description="Bigquery Dataset Label",
),
yield from get_ometa_tag_and_classification(
tags=[value],
classification_name=key,
tag_description="Bigquery Dataset Label",
classification_desciption="",
)
# Fetching policy tags on the column level
list_project_ids = [self.context.database.name.__root__]
@ -238,18 +228,12 @@ class BigquerySource(CommonDbSourceService):
policy_tags = PolicyTagManagerClient().list_policy_tags(
parent=taxonomy.name
)
for tag in policy_tags:
yield OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=taxonomy.display_name,
description="",
),
tag_request=CreateTagRequest(
classification=taxonomy.display_name,
name=tag.display_name,
description="Bigquery Policy Tag",
),
)
yield from get_ometa_tag_and_classification(
tags=[tag.display_name for tag in policy_tags],
classification_name=taxonomy.display_name,
tag_description="Bigquery Policy Tag",
classification_desciption="",
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Skipping Policy Tag: {exc}")
@ -292,16 +276,10 @@ class BigquerySource(CommonDbSourceService):
database_schema_request_obj.tags = []
for label_classification, label_tag_name in dataset_obj.labels.items():
database_schema_request_obj.tags.append(
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
classification_name=label_classification,
tag_name=label_tag_name,
),
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
get_tag_label(
metadata=self.metadata,
tag_name=label_tag_name,
classification_name=label_classification,
)
)
yield database_schema_request_obj
@ -320,20 +298,13 @@ class BigquerySource(CommonDbSourceService):
This will only get executed if the tags context
is properly informed
"""
if self.source_config.includeTags and column.get("policy_tags"):
return [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
classification_name=column["taxonomy"],
tag_name=column["policy_tags"],
),
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
)
]
if column.get("policy_tags"):
return get_tag_labels(
metadata=self.metadata,
tags=[column["policy_tags"]],
classification_name=column["taxonomy"],
include_tags=self.source_config.includeTags,
)
return None
def set_inspector(self, database_name: str):

View File

@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
@ -42,12 +41,7 @@ from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import delete_entity_from_source
@ -63,6 +57,7 @@ from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.utils import fqn
from metadata.utils.filters import filter_by_schema
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_tag_label
logger = ingestion_logger()
@ -270,21 +265,18 @@ class DatabaseServiceSource(
Pick up the tags registered in the context
searching by entity FQN
"""
return [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
classification_name=tag_and_category.classification_request.name.__root__,
tag_labels = []
for tag_and_category in self.context.tags or []:
if tag_and_category.fqn.__root__ == entity_fqn:
tag_label = get_tag_label(
metadata=self.metadata,
tag_name=tag_and_category.tag_request.name.__root__,
),
labelType=LabelType.Automated,
state=State.Suggested,
source=TagSource.Classification,
)
for tag_and_category in self.context.tags or []
if tag_and_category.fqn.__root__ == entity_fqn
] or None
classification_name=tag_and_category.classification_request.name.__root__,
)
if tag_label:
tag_labels.append(tag_label)
return tag_labels or None
def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]:
"""

View File

@ -15,10 +15,6 @@ import traceback
from enum import Enum
from typing import Iterable, List, Optional, Union
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import (
@ -67,9 +63,10 @@ from metadata.ingestion.source.database.dbt.dbt_service import (
DbtObjects,
DbtServiceSource,
)
from metadata.utils import entity_link, fqn, tag_utils
from metadata.utils import entity_link, fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -332,18 +329,15 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
)
for tag_name in dbt_tags_list
]
for tag_label in dbt_tag_labels or []:
yield OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=self.tag_classification_name,
description="dbt classification",
),
tag_request=CreateTagRequest(
classification=self.tag_classification_name,
name=tag_label.split(fqn.FQN_SEPARATOR)[1],
description="dbt Tags",
),
)
yield from get_ometa_tag_and_classification(
tags=[
tag_label.split(fqn.FQN_SEPARATOR)[1]
for tag_label in dbt_tag_labels
],
classification_name=self.tag_classification_name,
tag_description="dbt Tags",
classification_desciption="dbt classification",
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception creating DBT tags: {exc}")
@ -421,7 +415,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
dbt_table_tags_list = None
if manifest_node.tags:
dbt_table_tags_list = tag_utils.get_tag_labels(
dbt_table_tags_list = get_tag_labels(
metadata=self.metadata,
tags=manifest_node.tags,
classification_name=self.tag_classification_name,
@ -592,7 +586,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
ordinalPosition=catalog_column.index
if catalog_column
else None,
tags=tag_utils.get_tag_labels(
tags=get_tag_labels(
metadata=self.metadata,
tags=manifest_column.tags,
classification_name=self.tag_classification_name,

View File

@ -19,10 +19,6 @@ from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
IntervalType,
@ -64,6 +60,7 @@ from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
get_all_view_definitions,
)
from metadata.utils.tag_utils import get_ometa_tag_and_classification
TableKey = namedtuple("TableKey", ["schema", "table_name"])
@ -214,23 +211,17 @@ class PostgresSource(CommonDbSourceService):
schema_name=schema_name,
)
).all()
for res in result:
row = list(res)
fqn_elements = [name for name in row[2:] if name]
yield OMetaTagAndClassification(
fqn=fqn._build( # pylint: disable=protected-access
yield from get_ometa_tag_and_classification(
tag_fqn=fqn._build( # pylint: disable=protected-access
self.context.database_service.name.__root__, *fqn_elements
),
classification_request=CreateClassificationRequest(
name=self.service_connection.classificationName,
description="Postgres Tag Name",
),
tag_request=CreateTagRequest(
classification=self.service_connection.classificationName,
name=row[1],
description="Postgres Tag Value",
),
tags=[row[1]],
classification_name=self.service_connection.classificationName,
tag_description="Postgres Tag Value",
classification_desciption="Postgres Tag Name",
)
except Exception as exc:

View File

@ -21,10 +21,6 @@ from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names
from sqlalchemy.engine.reflection import Inspector
from sqlparse.sql import Function, Identifier
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
IntervalType,
@ -69,6 +65,7 @@ from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import get_all_table_comments
from metadata.utils.tag_utils import get_ometa_tag_and_classification
GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY")
GEOMETRY = create_sqlalchemy_type("GEOMETRY")
@ -296,19 +293,14 @@ class SnowflakeSource(CommonDbSourceService):
for res in result:
row = list(res)
fqn_elements = [name for name in row[2:] if name]
yield OMetaTagAndClassification(
fqn=fqn._build( # pylint: disable=protected-access
yield from get_ometa_tag_and_classification(
tag_fqn=fqn._build( # pylint: disable=protected-access
self.context.database_service.name.__root__, *fqn_elements
),
classification_request=CreateClassificationRequest(
name=row[0],
description="SNOWFLAKE TAG NAME",
),
tag_request=CreateTagRequest(
classification=row[0],
name=row[1],
description="SNOWFLAKE TAG VALUE",
),
tags=[row[1]],
classification_name=row[0],
tag_description="SNOWFLAKE TAG VALUE",
classification_desciption="SNOWFLAKE TAG NAME",
)
def query_table_names_and_types(

View File

@ -20,10 +20,6 @@ from pydantic import SecretStr
from sqlalchemy.engine.url import make_url
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
@ -36,7 +32,6 @@ from metadata.generated.schema.api.services.createDatabaseService import (
)
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table
@ -56,10 +51,8 @@ from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -74,6 +67,7 @@ from metadata.utils import fqn
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -225,22 +219,6 @@ class AmundsenSource(Source[Entity]):
logger.debug(traceback.format_exc())
logger.error(f"Failed to create user entity [{user}]: {exc}")
def create_tags(self, tags):
for tag in tags:
classification = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=AMUNDSEN_TAG_CATEGORY,
description="Tags associates with amundsen entities",
),
tag_request=CreateTagRequest(
classification=AMUNDSEN_TAG_CATEGORY,
name=tag,
description="Amundsen Table Tag",
),
)
yield classification
logger.info(f"Classification {classification}, Primary Tag {tag} Ingested")
def _yield_create_database(self, table):
try:
service_entity = self.get_database_service(table["database"])
@ -327,81 +305,31 @@ class AmundsenSource(Source[Entity]):
parsed_string["description"] = description
col = Column(**parsed_string)
columns.append(col)
amundsen_table_tag = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=AMUNDSEN_TAG_CATEGORY,
description="Tags associates with amundsen entities",
),
tag_request=CreateTagRequest(
classification=AMUNDSEN_TAG_CATEGORY,
name=AMUNDSEN_TABLE_TAG,
description="Amundsen Table Tag",
),
)
yield amundsen_table_tag
amundsen_cluster_tag = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=AMUNDSEN_TAG_CATEGORY,
description="Tags associates with amundsen entities",
),
tag_request=CreateTagRequest(
classification=AMUNDSEN_TAG_CATEGORY,
name=table["cluster"],
description="Amundsen Cluster Tag",
),
)
yield amundsen_cluster_tag
tags = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
classification_name=AMUNDSEN_TAG_CATEGORY,
tag_name=AMUNDSEN_TABLE_TAG,
),
labelType="Automated",
state="Suggested",
source="Classification",
),
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
classification_name=AMUNDSEN_TAG_CATEGORY,
tag_name=table["cluster"],
),
labelType="Automated",
state="Suggested",
source="Classification",
),
]
# We are creating a couple of custom tags
tags = [AMUNDSEN_TABLE_TAG, table["cluster"]]
if table["tags"]:
yield from self.create_tags(table["tags"])
tags.extend(
[
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
classification_name=AMUNDSEN_TAG_CATEGORY,
tag_name=tag,
),
labelType="Automated",
state="Suggested",
source="Classification",
)
for tag in table["tags"]
]
)
tags.extend(table["tags"])
yield from get_ometa_tag_and_classification(
tags=tags,
classification_name=AMUNDSEN_TAG_CATEGORY,
tag_description="Amundsen Table Tag",
classification_desciption="Tags associated with amundsen entities",
)
table_request = CreateTableRequest(
name=table["name"],
tableType="Regular",
description=table["description"],
description=table.get("description"),
databaseSchema=self.database_schema_object.fullyQualifiedName,
tags=tags,
tags=get_tag_labels(
metadata=self.metadata,
tags=tags,
classification_name=AMUNDSEN_TAG_CATEGORY,
include_tags=True,
),
columns=columns,
)
yield table_request
self.status.scanned(table["name"])

View File

@ -17,10 +17,6 @@ import traceback
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createDatabaseService import (
@ -29,7 +25,6 @@ from metadata.generated.schema.api.services.createDatabaseService import (
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.pipeline import Pipeline
@ -48,14 +43,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
@ -63,6 +51,7 @@ from metadata.ingestion.source.metadata.atlas.client import AtlasClient
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -244,7 +233,12 @@ class AtlasSource(Source):
force=True,
)
yield self.create_tag()
yield from get_ometa_tag_and_classification(
tags=[ATLAS_TABLE_TAG],
classification_name=ATLAS_TAG_CATEGORY,
tag_description="Atlas Cluster Tag",
classification_desciption="Tags associated with atlas entities",
)
table_fqn = fqn.build(
metadata=self.metadata,
@ -279,50 +273,42 @@ class AtlasSource(Source):
f"Failed to parse for database : {db_entity} - table {table}: {exc}"
)
def get_tag_label(self, tag_name: str) -> TagLabel:
return TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
classification_name=ATLAS_TAG_CATEGORY,
tag_name=tag_name,
),
labelType=LabelType.Automated,
state=State.Suggested.value,
source=TagSource.Classification,
)
def apply_table_tags(self, table_object: Table, table_entity: dict):
# apply default atlas table tag
self.metadata.patch_tag(
entity=Table,
source=table_object,
tag_label=self.get_tag_label(ATLAS_TABLE_TAG),
"""
apply default atlas table tag
"""
tag_labels = []
table_tags = get_tag_labels(
metadata=self.metadata,
tags=[ATLAS_TABLE_TAG],
classification_name=ATLAS_TAG_CATEGORY,
)
if table_tags:
tag_labels.extend(table_tags)
# apply classification tags
for tag in table_entity.get("classifications", []):
if tag and tag.get("typeName"):
yield self.create_tag(tag.get("typeName"))
self.metadata.patch_tag(
entity=Table,
source=table_object,
tag_label=self.get_tag_label(tag.get("typeName")),
yield from get_ometa_tag_and_classification(
tags=[tag.get("typeName", ATLAS_TABLE_TAG)],
classification_name=ATLAS_TAG_CATEGORY,
tag_description="Atlas Cluster Tag",
classification_desciption="Tags associated with atlas entities",
)
classification_tags = get_tag_labels(
metadata=self.metadata,
tags=[tag.get("typeName", ATLAS_TABLE_TAG)],
classification_name=ATLAS_TAG_CATEGORY,
)
if classification_tags:
tag_labels.extend(classification_tags)
def create_tag(self, tag_name: str = ATLAS_TABLE_TAG) -> OMetaTagAndClassification:
atlas_table_tag = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=ATLAS_TAG_CATEGORY,
description="Tags associates with atlas entities",
),
tag_request=CreateTagRequest(
classification=ATLAS_TAG_CATEGORY,
name=tag_name,
description="Atlas Cluster Tag",
),
)
return atlas_table_tag
for tag_label in tag_labels:
self.metadata.patch_tag(
entity=Table,
source=table_object,
tag_label=tag_label,
)
def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]:
om_cols = []

View File

@ -14,10 +14,6 @@ Dagster source to extract metadata from OM UI
import traceback
from typing import Dict, Iterable, List, Optional
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
@ -44,8 +40,8 @@ from metadata.ingestion.source.pipeline.dagster.queries import (
GRAPHQL_RUNS_QUERY,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import tag_utils
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -131,7 +127,7 @@ class DagsterSource(PipelineServiceSource):
description=pipeline_details.get("description", ""),
tasks=task_list,
service=self.context.pipeline_service.fullyQualifiedName.__root__,
tags=tag_utils.get_tag_labels(
tags=get_tag_labels(
metadata=self.metadata,
tags=[self.context.repository_name],
classification_name=DAGSTER_TAG_CATEGORY,
@ -142,26 +138,13 @@ class DagsterSource(PipelineServiceSource):
self.register_record(pipeline_request=pipeline_request)
def yield_tag(self, *_, **__) -> OMetaTagAndClassification:
if self.source_config.includeTags:
try:
classification = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=DAGSTER_TAG_CATEGORY,
description="Tags associated with dagster",
),
tag_request=CreateTagRequest(
classification=DAGSTER_TAG_CATEGORY,
name=self.context.repository_name,
description="Dagster Tag",
),
)
yield classification
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error ingesting tag [{self.context.repository_name}]: {err}"
)
yield from get_ometa_tag_and_classification(
tags=[self.context.repository_name],
classification_name=DAGSTER_TAG_CATEGORY,
tag_description="Dagster Tag",
classification_desciption="Tags associated with dagster entities",
include_tags=self.source_config.includeTags,
)
def get_task_runs(self, job_id, pipeline_name):
"""

View File

@ -16,15 +16,21 @@ Tag utils Module
import functools
import traceback
from typing import List, Optional
from typing import Iterable, List, Optional
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
@ -32,20 +38,69 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def get_ometa_tag_and_classification(
tags: List[str],
classification_name: str,
tag_description: Optional[str],
classification_desciption: Optional[str],
include_tags: bool = True,
tag_fqn: Optional[FullyQualifiedEntityName] = None,
) -> Optional[Iterable[OMetaTagAndClassification]]:
"""
Returns the OMetaTagAndClassification object
"""
if include_tags:
for tag in tags:
try:
classification = OMetaTagAndClassification(
fqn=tag_fqn,
classification_request=CreateClassificationRequest(
name=classification_name,
description=classification_desciption,
),
tag_request=CreateTagRequest(
classification=classification_name,
name=tag,
description=tag_description,
),
)
yield classification
logger.debug(
f"Classification {classification_name}, Tag {tag} Ingested"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error yielding tag-{tag}: {err}")
@functools.lru_cache(maxsize=512)
def _get_tag_label(metadata: OpenMetadata, tag_fqn: str) -> Optional[TagLabel]:
def get_tag_label(
metadata: OpenMetadata, tag_name: str, classification_name: str
) -> Optional[TagLabel]:
"""
Returns the tag label if the tag is created
"""
# Check if the tag exists
tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn)
if tag:
return TagLabel(
tagFQN=tag_fqn,
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
try:
# Build the tag FQN
tag_fqn = fqn.build(
metadata,
Tag,
classification_name=classification_name,
tag_name=tag_name,
)
# Check if the tag exists
tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn)
if tag:
return TagLabel(
tagFQN=tag_fqn,
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error processing tag label: {err}")
return None
@ -62,13 +117,9 @@ def get_tag_labels(
if tags and include_tags:
for tag in tags:
try:
tag_fqn = fqn.build(
metadata,
Tag,
classification_name=classification_name,
tag_name=tag,
tag_label = get_tag_label(
metadata, tag_name=tag, classification_name=classification_name
)
tag_label = _get_tag_label(metadata, tag_fqn=tag_fqn)
if tag_label:
tag_labels_list.append(tag_label)

View File

@ -25,8 +25,8 @@ from metadata.generated.schema.type.tagLabel import (
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.metadata import DbtSource
from metadata.utils import tag_utils
from metadata.utils.dbt_config import DbtFiles, DbtObjects
from metadata.utils.tag_utils import get_tag_labels
mock_dbt_config = {
"source": {
@ -296,11 +296,11 @@ class DbtUnitTest(TestCase):
@patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner")
@patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn")
@patch("metadata.utils.tag_utils._get_tag_label")
def test_dbt_manifest_v8(self, _get_tag_label, es_search_from_fqn, get_dbt_owner):
@patch("metadata.utils.tag_utils.get_tag_label")
def test_dbt_manifest_v8(self, get_tag_label, es_search_from_fqn, get_dbt_owner):
get_dbt_owner.return_value = MOCK_OWNER
es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES
_get_tag_label.side_effect = [
get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.model_tag_one",
labelType=LabelType.Automated.value,
@ -339,9 +339,9 @@ class DbtUnitTest(TestCase):
self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null"))
self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev"))
@patch("metadata.utils.tag_utils._get_tag_label")
def test_dbt_get_dbt_tag_labels(self, _get_tag_label):
_get_tag_label.side_effect = [
@patch("metadata.utils.tag_utils.get_tag_label")
def test_dbt_get_dbt_tag_labels(self, get_tag_label):
get_tag_label.side_effect = [
TagLabel(
tagFQN="dbtTags.tag1",
labelType=LabelType.Automated.value,
@ -363,7 +363,7 @@ class DbtUnitTest(TestCase):
]
mocked_metadata = MagicMock()
result = tag_utils.get_tag_labels(
result = get_tag_labels(
metadata=mocked_metadata,
classification_name="dbtTags",
tags=["tag1", "tag2.name", "tag3"],

View File

@ -464,11 +464,11 @@ class AtlasUnitTest(TestCase):
@patch.object(AtlasClient, "list_entities", mock_list_entities)
@patch.object(AtlasClient, "get_entity", mock_get_entity)
@patch.object(AtlasSource, "ingest_lineage", mock_ingest_lineage)
@patch.object(AtlasSource, "create_tag", mock_create_tag)
def test_description(self):
"""
Testing description updated for database, databaseSchema, table
"""
self.mock_create_tag()
_ = list(self.atlas_source.next_record())
updated_database = self.metadata.get_by_name(
entity=Database, fqn="hive.Reporting"

View File

@ -275,11 +275,11 @@ class DagsterUnitTest(TestCase):
)
@patch("metadata.ingestion.source.pipeline.dagster.metadata.DagsterSource.get_jobs")
@patch("metadata.utils.tag_utils._get_tag_label")
def test_yield_pipeline(self, _get_tag_label, get_jobs):
@patch("metadata.utils.tag_utils.get_tag_label")
def test_yield_pipeline(self, get_tag_label, get_jobs):
results = self.dagster.yield_pipeline(EXPECTED_DAGSTER_DETAILS)
get_jobs.return_value = EXPECTED_DAGSTER_DETAILS
_get_tag_label.return_value = TagLabel(
get_tag_label.return_value = TagLabel(
tagFQN="DagsterTags.hacker_new_repository",
labelType=LabelType.Automated.value,
state=State.Suggested.value,