diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py index acce49d55a3..7de7184a7eb 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash/metadata.py @@ -16,10 +16,6 @@ from typing import Iterable, List, Optional from packaging import version -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -42,10 +38,11 @@ from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource -from metadata.utils import fqn, tag_utils +from metadata.utils import fqn from metadata.utils.filters import filter_by_chart from metadata.utils.helpers import clean_uri, get_standard_chart_type from metadata.utils.logger import ingestion_logger +from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() @@ -94,27 +91,13 @@ class RedashSource(DashboardServiceSource): """ Fetch Dashboard Tags """ - if self.source_config.includeTags: - for tag in self.tags: - try: - classification = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=REDASH_TAG_CATEGORY, - description="Tags associates with redash entities", - ), - tag_request=CreateTagRequest( - classification=REDASH_TAG_CATEGORY, - name=tag, - description="Redash Tag", - ), - ) - yield classification - logger.info( - f"Classification {REDASH_TAG_CATEGORY}, Tag {tag} Ingested" - ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error ingesting tag {tag}: {exc}") + yield from get_ometa_tag_and_classification( + tags=self.tags, + classification_name=REDASH_TAG_CATEGORY, + tag_description="Redash Tag", + classification_desciption="Tags associated with redash entities", + include_tags=self.source_config.includeTags, + ) def get_dashboards_list(self) -> Optional[List[dict]]: """ @@ -192,7 +175,7 @@ class RedashSource(DashboardServiceSource): ], service=self.context.dashboard_service.fullyQualifiedName.__root__, dashboardUrl=self.get_dashboard_url(dashboard_details), - tags=tag_utils.get_tag_labels( + tags=get_tag_labels( metadata=self.metadata, tags=dashboard_details.get("tags"), classification_name=REDASH_TAG_CATEGORY, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py index fa30b25fe43..bc2f6cf3ed0 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py @@ -14,10 +14,6 @@ Tableau source module import traceback from typing import Iterable, List, Optional, Set -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.data.createDashboardDataModel import ( @@ -56,10 +52,11 @@ from metadata.ingestion.source.dashboard.tableau.models import ( UpstreamTable, ) from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser -from metadata.utils import fqn, tag_utils +from metadata.utils import fqn from metadata.utils.filters import filter_by_chart, filter_by_datamodel from metadata.utils.helpers import clean_uri, get_standard_chart_type from metadata.utils.logger import ingestion_logger +from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() @@ -172,27 +169,13 @@ class TableauSource(DashboardServiceSource): """ Fetch Dashboard Tags """ - if self.source_config.includeTags: - for tag in self.tags: - try: - classification = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=TABLEAU_TAG_CATEGORY, - description="Tags associates with tableau entities", - ), - tag_request=CreateTagRequest( - classification=TABLEAU_TAG_CATEGORY, - name=tag.label, - description="Tableau Tag", - ), - ) - yield classification - logger.info( - f"Classification {TABLEAU_TAG_CATEGORY}, Tag {tag} Ingested" - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(f"Error ingesting tag [{tag}]: {err}") + yield from get_ometa_tag_and_classification( + tags=[tag.label for tag in self.tags], + classification_name=TABLEAU_TAG_CATEGORY, + tag_description="Tableau Tag", + classification_desciption="Tags associated with tableau entities", + include_tags=self.source_config.includeTags, + ) def yield_datamodel( self, dashboard_details: TableauDashboard @@ -263,7 +246,7 @@ class TableauSource(DashboardServiceSource): ) for data_model in self.context.dataModels or [] ], - tags=tag_utils.get_tag_labels( + tags=get_tag_labels( metadata=self.metadata, tags=[tag.label for tag in dashboard_details.tags], classification_name=TABLEAU_TAG_CATEGORY, @@ -368,7 +351,7 @@ class TableauSource(DashboardServiceSource): displayName=chart.name, chartType=get_standard_chart_type(chart.sheetType), chartUrl=chart_url, - tags=tag_utils.get_tag_labels( + tags=get_tag_labels( metadata=self.metadata, tags=[tag.label for tag in chart.tags], classification_name=TABLEAU_TAG_CATEGORY, diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index ca001de8410..d62413c0847 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -25,14 +25,9 @@ from sqlalchemy.types import String from sqlalchemy_bigquery import BigQueryDialect, _types from sqlalchemy_bigquery._types import _get_sqla_column_type -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( IntervalType, @@ -56,12 +51,7 @@ from metadata.generated.schema.security.credentials.gcpValues import ( MultipleProjectId, SingleProjectId, ) -from metadata.generated.schema.type.tagLabel import ( - LabelType, - State, - TagLabel, - TagSource, -) +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.connections import get_connection @@ -74,6 +64,11 @@ from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import is_complex_type +from metadata.utils.tag_utils import ( + get_ometa_tag_and_classification, + get_tag_label, + get_tag_labels, +) class BQJSON(String): @@ -214,16 +209,11 @@ class BigquerySource(CommonDbSourceService): dataset_obj = self.client.get_dataset(schema_name) if dataset_obj.labels: for key, value in dataset_obj.labels.items(): - yield OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=key, - description="", - ), - tag_request=CreateTagRequest( - classification=key, - name=value, - description="Bigquery Dataset Label", - ), + yield from get_ometa_tag_and_classification( + tags=[value], + classification_name=key, + tag_description="Bigquery Dataset Label", + classification_desciption="", ) # Fetching policy tags on the column level list_project_ids = [self.context.database.name.__root__] @@ -238,18 +228,12 @@ class BigquerySource(CommonDbSourceService): policy_tags = PolicyTagManagerClient().list_policy_tags( parent=taxonomy.name ) - for tag in policy_tags: - yield OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=taxonomy.display_name, - description="", - ), - tag_request=CreateTagRequest( - classification=taxonomy.display_name, - name=tag.display_name, - description="Bigquery Policy Tag", - ), - ) + yield from get_ometa_tag_and_classification( + tags=[tag.display_name for tag in policy_tags], + classification_name=taxonomy.display_name, + tag_description="Bigquery Policy Tag", + classification_desciption="", + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Skipping Policy Tag: {exc}") @@ -292,16 +276,10 @@ class BigquerySource(CommonDbSourceService): database_schema_request_obj.tags = [] for label_classification, label_tag_name in dataset_obj.labels.items(): database_schema_request_obj.tags.append( - TagLabel( - tagFQN=fqn.build( - self.metadata, - entity_type=Tag, - classification_name=label_classification, - tag_name=label_tag_name, - ), - labelType=LabelType.Automated.value, - state=State.Suggested.value, - source=TagSource.Classification.value, + get_tag_label( + metadata=self.metadata, + tag_name=label_tag_name, + classification_name=label_classification, ) ) yield database_schema_request_obj @@ -320,20 +298,13 @@ class BigquerySource(CommonDbSourceService): This will only get executed if the tags context is properly informed """ - if self.source_config.includeTags and column.get("policy_tags"): - return [ - TagLabel( - tagFQN=fqn.build( - self.metadata, - entity_type=Tag, - classification_name=column["taxonomy"], - tag_name=column["policy_tags"], - ), - labelType=LabelType.Automated.value, - state=State.Suggested.value, - source=TagSource.Classification.value, - ) - ] + if column.get("policy_tags"): + return get_tag_labels( + metadata=self.metadata, + tags=[column["policy_tags"]], + classification_name=column["taxonomy"], + include_tags=self.source_config.includeTags, + ) return None def set_inspector(self, database_name: str): diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 86ee789d3c1..e204db009d9 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( ) from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -42,12 +41,7 @@ from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.tagLabel import ( - LabelType, - State, - TagLabel, - TagSource, -) +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.source import Source from metadata.ingestion.api.topology_runner import TopologyRunnerMixin from metadata.ingestion.models.delete_entity import delete_entity_from_source @@ -63,6 +57,7 @@ from metadata.ingestion.source.connections import get_test_connection_fn from metadata.utils import fqn from metadata.utils.filters import filter_by_schema from metadata.utils.logger import ingestion_logger +from metadata.utils.tag_utils import get_tag_label logger = ingestion_logger() @@ -270,21 +265,18 @@ class DatabaseServiceSource( Pick up the tags registered in the context searching by entity FQN """ - return [ - TagLabel( - tagFQN=fqn.build( - self.metadata, - entity_type=Tag, - classification_name=tag_and_category.classification_request.name.__root__, + + tag_labels = [] + for tag_and_category in self.context.tags or []: + if tag_and_category.fqn.__root__ == entity_fqn: + tag_label = get_tag_label( + metadata=self.metadata, tag_name=tag_and_category.tag_request.name.__root__, - ), - labelType=LabelType.Automated, - state=State.Suggested, - source=TagSource.Classification, - ) - for tag_and_category in self.context.tags or [] - if tag_and_category.fqn.__root__ == entity_fqn - ] or None + classification_name=tag_and_category.classification_request.name.__root__, + ) + if tag_label: + tag_labels.append(tag_label) + return tag_labels or None def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]: """ diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index a89142cef50..07cf7048549 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -15,10 +15,6 @@ import traceback from enum import Enum from typing import Iterable, List, Optional, Union -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestDefinition import ( @@ -67,9 +63,10 @@ from metadata.ingestion.source.database.dbt.dbt_service import ( DbtObjects, DbtServiceSource, ) -from metadata.utils import entity_link, fqn, tag_utils +from metadata.utils import entity_link, fqn from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.utils.logger import ingestion_logger +from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() @@ -332,18 +329,15 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ) for tag_name in dbt_tags_list ] - for tag_label in dbt_tag_labels or []: - yield OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=self.tag_classification_name, - description="dbt classification", - ), - tag_request=CreateTagRequest( - classification=self.tag_classification_name, - name=tag_label.split(fqn.FQN_SEPARATOR)[1], - description="dbt Tags", - ), - ) + yield from get_ometa_tag_and_classification( + tags=[ + tag_label.split(fqn.FQN_SEPARATOR)[1] + for tag_label in dbt_tag_labels + ], + classification_name=self.tag_classification_name, + tag_description="dbt Tags", + classification_desciption="dbt classification", + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Unexpected exception creating DBT tags: {exc}") @@ -421,7 +415,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods dbt_table_tags_list = None if manifest_node.tags: - dbt_table_tags_list = tag_utils.get_tag_labels( + dbt_table_tags_list = get_tag_labels( metadata=self.metadata, tags=manifest_node.tags, classification_name=self.tag_classification_name, @@ -592,7 +586,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ordinalPosition=catalog_column.index if catalog_column else None, - tags=tag_utils.get_tag_labels( + tags=get_tag_labels( metadata=self.metadata, tags=manifest_column.tags, classification_name=self.tag_classification_name, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index fc6bb8041c8..a670d7e2db2 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -19,10 +19,6 @@ from sqlalchemy import sql from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names from sqlalchemy.engine.reflection import Inspector -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( IntervalType, @@ -64,6 +60,7 @@ from metadata.utils.sqlalchemy_utils import ( get_all_table_comments, get_all_view_definitions, ) +from metadata.utils.tag_utils import get_ometa_tag_and_classification TableKey = namedtuple("TableKey", ["schema", "table_name"]) @@ -214,23 +211,17 @@ class PostgresSource(CommonDbSourceService): schema_name=schema_name, ) ).all() - for res in result: row = list(res) fqn_elements = [name for name in row[2:] if name] - yield OMetaTagAndClassification( - fqn=fqn._build( # pylint: disable=protected-access + yield from get_ometa_tag_and_classification( + tag_fqn=fqn._build( # pylint: disable=protected-access self.context.database_service.name.__root__, *fqn_elements ), - classification_request=CreateClassificationRequest( - name=self.service_connection.classificationName, - description="Postgres Tag Name", - ), - tag_request=CreateTagRequest( - classification=self.service_connection.classificationName, - name=row[1], - description="Postgres Tag Value", - ), + tags=[row[1]], + classification_name=self.service_connection.classificationName, + tag_description="Postgres Tag Value", + classification_desciption="Postgres Tag Name", ) except Exception as exc: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 6d9582487a2..461c7f693a2 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -21,10 +21,6 @@ from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names from sqlalchemy.engine.reflection import Inspector from sqlparse.sql import Function, Identifier -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( IntervalType, @@ -69,6 +65,7 @@ from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import get_all_table_comments +from metadata.utils.tag_utils import get_ometa_tag_and_classification GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY") GEOMETRY = create_sqlalchemy_type("GEOMETRY") @@ -296,19 +293,14 @@ class SnowflakeSource(CommonDbSourceService): for res in result: row = list(res) fqn_elements = [name for name in row[2:] if name] - yield OMetaTagAndClassification( - fqn=fqn._build( # pylint: disable=protected-access + yield from get_ometa_tag_and_classification( + tag_fqn=fqn._build( # pylint: disable=protected-access self.context.database_service.name.__root__, *fqn_elements ), - classification_request=CreateClassificationRequest( - name=row[0], - description="SNOWFLAKE TAG NAME", - ), - tag_request=CreateTagRequest( - classification=row[0], - name=row[1], - description="SNOWFLAKE TAG VALUE", - ), + tags=[row[1]], + classification_name=row[0], + tag_description="SNOWFLAKE TAG VALUE", + classification_desciption="SNOWFLAKE TAG NAME", ) def query_table_names_and_types( diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py index 0b609185c22..19838ab95cf 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen/metadata.py @@ -20,10 +20,6 @@ from pydantic import SecretStr from sqlalchemy.engine.url import make_url from metadata.config.common import ConfigModel -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -36,7 +32,6 @@ from metadata.generated.schema.api.services.createDatabaseService import ( ) from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest -from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Column, Table @@ -56,10 +51,8 @@ from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source -from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -74,6 +67,7 @@ from metadata.utils import fqn from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER +from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() @@ -225,22 +219,6 @@ class AmundsenSource(Source[Entity]): logger.debug(traceback.format_exc()) logger.error(f"Failed to create user entity [{user}]: {exc}") - def create_tags(self, tags): - for tag in tags: - classification = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=AMUNDSEN_TAG_CATEGORY, - description="Tags associates with amundsen entities", - ), - tag_request=CreateTagRequest( - classification=AMUNDSEN_TAG_CATEGORY, - name=tag, - description="Amundsen Table Tag", - ), - ) - yield classification - logger.info(f"Classification {classification}, Primary Tag {tag} Ingested") - def _yield_create_database(self, table): try: service_entity = self.get_database_service(table["database"]) @@ -327,81 +305,31 @@ class AmundsenSource(Source[Entity]): parsed_string["description"] = description col = Column(**parsed_string) columns.append(col) - amundsen_table_tag = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=AMUNDSEN_TAG_CATEGORY, - description="Tags associates with amundsen entities", - ), - tag_request=CreateTagRequest( - classification=AMUNDSEN_TAG_CATEGORY, - name=AMUNDSEN_TABLE_TAG, - description="Amundsen Table Tag", - ), - ) - yield amundsen_table_tag - amundsen_cluster_tag = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=AMUNDSEN_TAG_CATEGORY, - description="Tags associates with amundsen entities", - ), - tag_request=CreateTagRequest( - classification=AMUNDSEN_TAG_CATEGORY, - name=table["cluster"], - description="Amundsen Cluster Tag", - ), - ) - yield amundsen_cluster_tag - tags = [ - TagLabel( - tagFQN=fqn.build( - self.metadata, - Tag, - classification_name=AMUNDSEN_TAG_CATEGORY, - tag_name=AMUNDSEN_TABLE_TAG, - ), - labelType="Automated", - state="Suggested", - source="Classification", - ), - TagLabel( - tagFQN=fqn.build( - self.metadata, - Tag, - classification_name=AMUNDSEN_TAG_CATEGORY, - tag_name=table["cluster"], - ), - labelType="Automated", - state="Suggested", - source="Classification", - ), - ] + + # We are creating a couple of custom tags + tags = [AMUNDSEN_TABLE_TAG, table["cluster"]] if table["tags"]: - yield from self.create_tags(table["tags"]) - tags.extend( - [ - TagLabel( - tagFQN=fqn.build( - self.metadata, - Tag, - classification_name=AMUNDSEN_TAG_CATEGORY, - tag_name=tag, - ), - labelType="Automated", - state="Suggested", - source="Classification", - ) - for tag in table["tags"] - ] - ) + tags.extend(table["tags"]) + yield from get_ometa_tag_and_classification( + tags=tags, + classification_name=AMUNDSEN_TAG_CATEGORY, + tag_description="Amundsen Table Tag", + classification_desciption="Tags associated with amundsen entities", + ) + table_request = CreateTableRequest( name=table["name"], tableType="Regular", - description=table["description"], + description=table.get("description"), databaseSchema=self.database_schema_object.fullyQualifiedName, - tags=tags, + tags=get_tag_labels( + metadata=self.metadata, + tags=tags, + classification_name=AMUNDSEN_TAG_CATEGORY, + include_tags=True, + ), columns=columns, ) - yield table_request self.status.scanned(table["name"]) diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py index 09747a2e468..8a02288187a 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas/metadata.py @@ -17,10 +17,6 @@ import traceback from dataclasses import dataclass from typing import Any, Dict, Iterable, List -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createDatabaseService import ( @@ -29,7 +25,6 @@ from metadata.generated.schema.api.services.createDatabaseService import ( from metadata.generated.schema.api.services.createMessagingService import ( CreateMessagingServiceRequest, ) -from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.pipeline import Pipeline @@ -48,14 +43,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.tagLabel import ( - LabelType, - State, - TagLabel, - TagSource, -) from metadata.ingestion.api.source import InvalidSourceException, Source -from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser @@ -63,6 +51,7 @@ from metadata.ingestion.source.metadata.atlas.client import AtlasClient from metadata.utils import fqn from metadata.utils.logger import ingestion_logger from metadata.utils.metadata_service_helper import SERVICE_TYPE_MAPPER +from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() @@ -244,7 +233,12 @@ class AtlasSource(Source): force=True, ) - yield self.create_tag() + yield from get_ometa_tag_and_classification( + tags=[ATLAS_TABLE_TAG], + classification_name=ATLAS_TAG_CATEGORY, + tag_description="Atlas Cluster Tag", + classification_desciption="Tags associated with atlas entities", + ) table_fqn = fqn.build( metadata=self.metadata, @@ -279,50 +273,42 @@ class AtlasSource(Source): f"Failed to parse for database : {db_entity} - table {table}: {exc}" ) - def get_tag_label(self, tag_name: str) -> TagLabel: - return TagLabel( - tagFQN=fqn.build( - self.metadata, - Tag, - classification_name=ATLAS_TAG_CATEGORY, - tag_name=tag_name, - ), - labelType=LabelType.Automated, - state=State.Suggested.value, - source=TagSource.Classification, - ) - def apply_table_tags(self, table_object: Table, table_entity: dict): - # apply default atlas table tag - self.metadata.patch_tag( - entity=Table, - source=table_object, - tag_label=self.get_tag_label(ATLAS_TABLE_TAG), + """ + apply default atlas table tag + """ + tag_labels = [] + table_tags = get_tag_labels( + metadata=self.metadata, + tags=[ATLAS_TABLE_TAG], + classification_name=ATLAS_TAG_CATEGORY, ) + if table_tags: + tag_labels.extend(table_tags) # apply classification tags for tag in table_entity.get("classifications", []): if tag and tag.get("typeName"): - yield self.create_tag(tag.get("typeName")) - self.metadata.patch_tag( - entity=Table, - source=table_object, - tag_label=self.get_tag_label(tag.get("typeName")), + yield from get_ometa_tag_and_classification( + tags=[tag.get("typeName", ATLAS_TABLE_TAG)], + classification_name=ATLAS_TAG_CATEGORY, + tag_description="Atlas Cluster Tag", + classification_desciption="Tags associated with atlas entities", ) + classification_tags = get_tag_labels( + metadata=self.metadata, + tags=[tag.get("typeName", ATLAS_TABLE_TAG)], + classification_name=ATLAS_TAG_CATEGORY, + ) + if classification_tags: + tag_labels.extend(classification_tags) - def create_tag(self, tag_name: str = ATLAS_TABLE_TAG) -> OMetaTagAndClassification: - atlas_table_tag = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=ATLAS_TAG_CATEGORY, - description="Tags associates with atlas entities", - ), - tag_request=CreateTagRequest( - classification=ATLAS_TAG_CATEGORY, - name=tag_name, - description="Atlas Cluster Tag", - ), - ) - return atlas_table_tag + for tag_label in tag_labels: + self.metadata.patch_tag( + entity=Table, + source=table_object, + tag_label=tag_label, + ) def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]: om_cols = [] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py index 43e671eddc1..5c54d1509ab 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py @@ -14,10 +14,6 @@ Dagster source to extract metadata from OM UI import traceback from typing import Dict, Iterable, List, Optional -from metadata.generated.schema.api.classification.createClassification import ( - CreateClassificationRequest, -) -from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import ( @@ -44,8 +40,8 @@ from metadata.ingestion.source.pipeline.dagster.queries import ( GRAPHQL_RUNS_QUERY, ) from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource -from metadata.utils import tag_utils from metadata.utils.logger import ingestion_logger +from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels logger = ingestion_logger() @@ -131,7 +127,7 @@ class DagsterSource(PipelineServiceSource): description=pipeline_details.get("description", ""), tasks=task_list, service=self.context.pipeline_service.fullyQualifiedName.__root__, - tags=tag_utils.get_tag_labels( + tags=get_tag_labels( metadata=self.metadata, tags=[self.context.repository_name], classification_name=DAGSTER_TAG_CATEGORY, @@ -142,26 +138,13 @@ class DagsterSource(PipelineServiceSource): self.register_record(pipeline_request=pipeline_request) def yield_tag(self, *_, **__) -> OMetaTagAndClassification: - if self.source_config.includeTags: - try: - classification = OMetaTagAndClassification( - classification_request=CreateClassificationRequest( - name=DAGSTER_TAG_CATEGORY, - description="Tags associated with dagster", - ), - tag_request=CreateTagRequest( - classification=DAGSTER_TAG_CATEGORY, - name=self.context.repository_name, - description="Dagster Tag", - ), - ) - - yield classification - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error( - f"Error ingesting tag [{self.context.repository_name}]: {err}" - ) + yield from get_ometa_tag_and_classification( + tags=[self.context.repository_name], + classification_name=DAGSTER_TAG_CATEGORY, + tag_description="Dagster Tag", + classification_desciption="Tags associated with dagster entities", + include_tags=self.source_config.includeTags, + ) def get_task_runs(self, job_id, pipeline_name): """ diff --git a/ingestion/src/metadata/utils/tag_utils.py b/ingestion/src/metadata/utils/tag_utils.py index ba65a136a9d..c63b41156c5 100644 --- a/ingestion/src/metadata/utils/tag_utils.py +++ b/ingestion/src/metadata/utils/tag_utils.py @@ -16,15 +16,21 @@ Tag utils Module import functools import traceback -from typing import List, Optional +from typing import Iterable, List, Optional +from metadata.generated.schema.api.classification.createClassification import ( + CreateClassificationRequest, +) +from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.entity.classification.tag import Tag +from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.tagLabel import ( LabelType, State, TagLabel, TagSource, ) +from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.logger import ingestion_logger @@ -32,20 +38,69 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +def get_ometa_tag_and_classification( + tags: List[str], + classification_name: str, + tag_description: Optional[str], + classification_desciption: Optional[str], + include_tags: bool = True, + tag_fqn: Optional[FullyQualifiedEntityName] = None, +) -> Optional[Iterable[OMetaTagAndClassification]]: + """ + Returns the OMetaTagAndClassification object + """ + if include_tags: + for tag in tags: + try: + classification = OMetaTagAndClassification( + fqn=tag_fqn, + classification_request=CreateClassificationRequest( + name=classification_name, + description=classification_desciption, + ), + tag_request=CreateTagRequest( + classification=classification_name, + name=tag, + description=tag_description, + ), + ) + yield classification + logger.debug( + f"Classification {classification_name}, Tag {tag} Ingested" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error yielding tag-{tag}: {err}") + + @functools.lru_cache(maxsize=512) -def _get_tag_label(metadata: OpenMetadata, tag_fqn: str) -> Optional[TagLabel]: +def get_tag_label( + metadata: OpenMetadata, tag_name: str, classification_name: str +) -> Optional[TagLabel]: """ Returns the tag label if the tag is created """ - # Check if the tag exists - tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn) - if tag: - return TagLabel( - tagFQN=tag_fqn, - labelType=LabelType.Automated.value, - state=State.Suggested.value, - source=TagSource.Classification.value, + try: + # Build the tag FQN + tag_fqn = fqn.build( + metadata, + Tag, + classification_name=classification_name, + tag_name=tag_name, ) + + # Check if the tag exists + tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn) + if tag: + return TagLabel( + tagFQN=tag_fqn, + labelType=LabelType.Automated.value, + state=State.Suggested.value, + source=TagSource.Classification.value, + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error processing tag label: {err}") return None @@ -62,13 +117,9 @@ def get_tag_labels( if tags and include_tags: for tag in tags: try: - tag_fqn = fqn.build( - metadata, - Tag, - classification_name=classification_name, - tag_name=tag, + tag_label = get_tag_label( + metadata, tag_name=tag, classification_name=classification_name ) - tag_label = _get_tag_label(metadata, tag_fqn=tag_fqn) if tag_label: tag_labels_list.append(tag_label) diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index 9e8cfe4249a..08ad4640e45 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -25,8 +25,8 @@ from metadata.generated.schema.type.tagLabel import ( ) from metadata.ingestion.source.database.database_service import DataModelLink from metadata.ingestion.source.database.dbt.metadata import DbtSource -from metadata.utils import tag_utils from metadata.utils.dbt_config import DbtFiles, DbtObjects +from metadata.utils.tag_utils import get_tag_labels mock_dbt_config = { "source": { @@ -296,11 +296,11 @@ class DbtUnitTest(TestCase): @patch("metadata.ingestion.source.database.dbt.metadata.DbtSource.get_dbt_owner") @patch("metadata.ingestion.ometa.mixins.es_mixin.ESMixin.es_search_from_fqn") - @patch("metadata.utils.tag_utils._get_tag_label") - def test_dbt_manifest_v8(self, _get_tag_label, es_search_from_fqn, get_dbt_owner): + @patch("metadata.utils.tag_utils.get_tag_label") + def test_dbt_manifest_v8(self, get_tag_label, es_search_from_fqn, get_dbt_owner): get_dbt_owner.return_value = MOCK_OWNER es_search_from_fqn.return_value = MOCK_TABLE_ENTITIES - _get_tag_label.side_effect = [ + get_tag_label.side_effect = [ TagLabel( tagFQN="dbtTags.model_tag_one", labelType=LabelType.Automated.value, @@ -339,9 +339,9 @@ class DbtUnitTest(TestCase): self.assertIsNone(self.dbt_source_obj.get_corrected_name(name="null")) self.assertIsNotNone(self.dbt_source_obj.get_corrected_name(name="dev")) - @patch("metadata.utils.tag_utils._get_tag_label") - def test_dbt_get_dbt_tag_labels(self, _get_tag_label): - _get_tag_label.side_effect = [ + @patch("metadata.utils.tag_utils.get_tag_label") + def test_dbt_get_dbt_tag_labels(self, get_tag_label): + get_tag_label.side_effect = [ TagLabel( tagFQN="dbtTags.tag1", labelType=LabelType.Automated.value, @@ -363,7 +363,7 @@ class DbtUnitTest(TestCase): ] mocked_metadata = MagicMock() - result = tag_utils.get_tag_labels( + result = get_tag_labels( metadata=mocked_metadata, classification_name="dbtTags", tags=["tag1", "tag2.name", "tag3"], diff --git a/ingestion/tests/unit/topology/metadata/test_atlas.py b/ingestion/tests/unit/topology/metadata/test_atlas.py index efb83a3326c..4cb14a02893 100644 --- a/ingestion/tests/unit/topology/metadata/test_atlas.py +++ b/ingestion/tests/unit/topology/metadata/test_atlas.py @@ -464,11 +464,11 @@ class AtlasUnitTest(TestCase): @patch.object(AtlasClient, "list_entities", mock_list_entities) @patch.object(AtlasClient, "get_entity", mock_get_entity) @patch.object(AtlasSource, "ingest_lineage", mock_ingest_lineage) - @patch.object(AtlasSource, "create_tag", mock_create_tag) def test_description(self): """ Testing description updated for database, databaseSchema, table """ + self.mock_create_tag() _ = list(self.atlas_source.next_record()) updated_database = self.metadata.get_by_name( entity=Database, fqn="hive.Reporting" diff --git a/ingestion/tests/unit/topology/pipeline/test_dagster.py b/ingestion/tests/unit/topology/pipeline/test_dagster.py index f9238e5cdb4..bbb21891f23 100644 --- a/ingestion/tests/unit/topology/pipeline/test_dagster.py +++ b/ingestion/tests/unit/topology/pipeline/test_dagster.py @@ -275,11 +275,11 @@ class DagsterUnitTest(TestCase): ) @patch("metadata.ingestion.source.pipeline.dagster.metadata.DagsterSource.get_jobs") - @patch("metadata.utils.tag_utils._get_tag_label") - def test_yield_pipeline(self, _get_tag_label, get_jobs): + @patch("metadata.utils.tag_utils.get_tag_label") + def test_yield_pipeline(self, get_tag_label, get_jobs): results = self.dagster.yield_pipeline(EXPECTED_DAGSTER_DETAILS) get_jobs.return_value = EXPECTED_DAGSTER_DETAILS - _get_tag_label.return_value = TagLabel( + get_tag_label.return_value = TagLabel( tagFQN="DagsterTags.hacker_new_repository", labelType=LabelType.Automated.value, state=State.Suggested.value,