diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index e5efce380f8..efc6656bb59 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -17,8 +17,9 @@ from typing import Iterable, List, Optional from pydantic import SecretStr from metadata.config.common import ConfigModel -from metadata.generated.schema.api.services.createDatabaseService import ( - CreateDatabaseServiceRequest, +from metadata.generated.schema.api.tags.createTag import CreateTagRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, ) from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest @@ -38,21 +39,25 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.user import OMetaUserProfile +from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.neo4j_helper import Neo4JConfig, Neo4jHelper from metadata.utils import fqn from metadata.utils.column_type_parser import ColumnTypeParser from metadata.utils.helpers import get_dashboard_service_or_create from metadata.utils.logger import ingestion_logger +from metadata.utils.neo4j_helper import Neo4JConfig, Neo4jHelper from metadata.utils.sql_queries import ( NEO4J_AMUNDSEN_DASHBOARD_QUERY, NEO4J_AMUNDSEN_TABLE_QUERY, @@ -72,6 +77,8 @@ class AmundsenConfig(ConfigModel): PRIMITIVE_TYPES = ["int", "char", "varchar"] +AMUNDSEN_TAG_CATEGORY = "AmundsenTags" +AMUNDSEN_TABLE_TAG = "amundsen_table" @dataclass @@ -93,8 +100,8 @@ class AmundsenSource(Source[Entity]): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): self.config = config self.metadata_config = metadata_config - self.metadata = OpenMetadata(self.metadata_config) self.service_connection = config.serviceConnection.__root__.config + self.metadata = OpenMetadata(self.metadata_config) neo4j_config = Neo4JConfig( username=self.service_connection.username, @@ -156,15 +163,30 @@ class AmundsenSource(Source[Entity]): except Exception as err: logger.error(err) + def create_tags(self, tags): + for tag in tags: + tag_category = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=AMUNDSEN_TAG_CATEGORY, + description="Tags associates with amundsen entities", + categoryType="Descriptive", + ), + category_details=CreateTagRequest( + name=tag, description="Amundsen Table Tag" + ), + ) + yield tag_category + logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested") + def create_table_entity(self, table): try: - service_name = table["cluster"] - service_type = table["database"] + service_name = table["database"] + service_type = self.database_service_map.get( + service_name.lower(), DatabaseServiceType.Mysql.value + ) # TODO: use metadata.get_service_or_create - service_entity = self.get_database_service_or_create( - service_name, service_type - ) + service_entity = self.get_database_service(service_name) database = Database( id=uuid.uuid4(), name="default", @@ -188,23 +210,79 @@ class AmundsenSource(Source[Entity]): parsed_string = ColumnTypeParser._parse_datatype_string(data_type) parsed_string["name"] = name parsed_string["dataLength"] = 1 + parsed_string["description"] = description col = Column(**parsed_string) columns.append(col) - - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=service_name, - database_name=database.name.__root__, - schema_name=database_schema.name.__root__, - table_name=table["name"], + amundsen_table_tag = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=AMUNDSEN_TAG_CATEGORY, + description="Tags associates with amundsen entities", + categoryType="Descriptive", + ), + category_details=CreateTagRequest( + name=AMUNDSEN_TABLE_TAG, description="Amundsen Table Tag" + ), ) + yield amundsen_table_tag + amundsen_cluster_tag = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=AMUNDSEN_TAG_CATEGORY, + description="Tags associates with amundsen entities", + categoryType="Descriptive", + ), + category_details=CreateTagRequest( + name=table["cluster"], description="Amundsen Cluster Tag" + ), + ) + yield amundsen_cluster_tag + tags = [ + TagLabel( + tagFQN=fqn.build( + self.metadata, + Tag, + tag_category_name=AMUNDSEN_TAG_CATEGORY, + tag_name=AMUNDSEN_TABLE_TAG, + ), + labelType="Automated", + state="Suggested", + source="Tag", + ), + TagLabel( + tagFQN=fqn.build( + self.metadata, + Tag, + tag_category_name=AMUNDSEN_TAG_CATEGORY, + tag_name=table["cluster"], + ), + labelType="Automated", + state="Suggested", + source="Tag", + ), + ] + if table["tags"]: + yield from self.create_tags(table["tags"]) + tags.extend( + [ + TagLabel( + tagFQN=fqn.build( + self.metadata, + Tag, + tag_category_name=AMUNDSEN_TAG_CATEGORY, + tag_name=tag, + ), + labelType="Automated", + state="Suggested", + source="Tag", + ) + for tag in table["tags"] + ] + ) table_entity = Table( id=uuid.uuid4(), name=table["name"], tableType="Regular", description=table["description"], - fullyQualifiedName=table_fqn, + tags=tags, columns=columns, ) @@ -225,7 +303,7 @@ class AmundsenSource(Source[Entity]): service_entity = get_dashboard_service_or_create( service_name, DashboardServiceType.Superset.name, - {}, + {"username": "test", "hostPort": "http://localhost:8088"}, self.metadata_config, ) self.status.scanned(dashboard["name"]) @@ -248,7 +326,7 @@ class AmundsenSource(Source[Entity]): service_entity = get_dashboard_service_or_create( service_name, DashboardServiceType.Superset.name, - {}, + {"username": "test", "hostPort": "http://localhost:8088"}, self.metadata_config, ) @@ -282,25 +360,12 @@ class AmundsenSource(Source[Entity]): return p_type return data_type - def get_database_service_or_create( - self, service_name: str, service_type: str - ) -> DatabaseService: + def get_database_service(self, service_name: str) -> DatabaseService: service = self.metadata.get_by_name(entity=DatabaseService, fqn=service_name) if service is not None: return service else: - service = { - "name": service_name, - "description": "", - "serviceType": self.database_service_map.get( - service_type.lower(), DatabaseServiceType.Mysql.value - ), - "connection": {"config": {}}, - } - created_service = self.metadata.create_or_update( - CreateDatabaseServiceRequest(**service) - ) - return created_service + logger.error(f"Please create a service with name {service_name}") def test_connection(self) -> None: pass