From f552960ca71af0cb60c6b89780b02ebe71207bfc Mon Sep 17 00:00:00 2001 From: NiharDoshi99 <51595473+NiharDoshi99@users.noreply.github.com> Date: Thu, 18 Aug 2022 16:18:07 +0530 Subject: [PATCH] Clean Atlas, Amundsen and metadata_rest (#6649) * Clean Atlas, Amundsen and metadata_rest * FIX: Based on comment * Added test case for coverage * Fixed: Py test * FIX: Added atlas yaml mapping file * FIX: Py Test * FIX: Revert Test * FIX: Removed as per dataset --- .../src/metadata/clients/atlas_client.py | 2 +- .../ingestion/source/metadata/amundsen.py | 78 +++++++++++------ .../ingestion/source/metadata/atlas.py | 84 +++++++++++-------- .../ingestion/source/metadata/metadata.py | 8 +- 4 files changed, 106 insertions(+), 66 deletions(-) diff --git a/ingestion/src/metadata/clients/atlas_client.py b/ingestion/src/metadata/clients/atlas_client.py index a9601634d06..7547d94a947 100644 --- a/ingestion/src/metadata/clients/atlas_client.py +++ b/ingestion/src/metadata/clients/atlas_client.py @@ -1,7 +1,7 @@ import base64 from typing import List -from metadata.generated.schema.entity.services.connections.database.atlasConnection import ( +from metadata.generated.schema.entity.services.connections.metadata.atlasConnection import ( AtlasConnection, ) from metadata.ingestion.ometa.client import REST, APIError, ClientConfig diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index 100793d14fc..d52d21b81ac 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -10,16 +10,19 @@ # limitations under the License. import traceback -import uuid -from dataclasses import dataclass, field from typing import Iterable, List, Optional from pydantic import SecretStr -from metadata.clients.neo4j_helper import Neo4JConfig, Neo4jHelper +from metadata.clients.neo4j_client import Neo4JConfig, Neo4jHelper from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.tags.createTag import CreateTagRequest from metadata.generated.schema.api.tags.createTagCategory import ( CreateTagCategoryRequest, @@ -41,6 +44,7 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.entity.tags.tagCategory import Tag +from metadata.generated.schema.entity.teams import team from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -93,12 +97,11 @@ SUPERSET_DEFAULT_CONFIG = { } -@dataclass class AmundsenStatus(SourceStatus): - success: List[str] = field(default_factory=list) - failures: List[str] = field(default_factory=list) - warnings: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) + success: List[str] = list() + failures: List[str] = list() + warnings: List[str] = list() + filtered: List[str] = list() def scanned(self, entity_name: str) -> None: self.success.append(entity_name) @@ -168,7 +171,10 @@ class AmundsenSource(Source[Entity]): name=user["full_name"], displayName=f"{user['first_name']} {user['last_name']}", ) - team_metadata = CreateTeamRequest(name=user["team_name"]) + team_metadata = CreateTeamRequest( + name=user["team_name"], + teamType=team.TeamType.Department.value, + ) self.status.scanned(str(user_metadata.email)) yield OMetaUserProfile( user=user_metadata, @@ -195,21 +201,43 @@ class AmundsenSource(Source[Entity]): def create_table_entity(self, table): try: service_name = table["database"] - service_type = self.database_service_map.get( - service_name.lower(), DatabaseServiceType.Mysql.value - ) - # TODO: use metadata.get_service_or_create service_entity = self.get_database_service(service_name) - database = Database( - id=uuid.uuid4(), + + database_request = CreateDatabaseRequest( name="default", - service=EntityReference(id=service_entity.id, type=service_type), + service=EntityReference(id=service_entity.id, type="databaseService"), ) - database_schema = DatabaseSchema( + + yield database_request + + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=service_name, + database_name=database_request.name.__root__, + ) + + database_object = self.metadata.get_by_name( + entity=Database, fqn=database_fqn + ) + + database_schema_request = CreateDatabaseSchemaRequest( name=table["schema"], - service=EntityReference(id=service_entity.id, type=service_type), - database=EntityReference(id=database.id.__root__, type="database"), + database=EntityReference(id=database_object.id, type="database"), + ) + yield database_schema_request + + database_schema_fqn = fqn.build( + self.metadata, + entity_type=DatabaseSchema, + service_name=service_name, + database_name=database_request.name.__root__, + schema_name=database_schema_request.name.__root__, + ) + + database_schema_object = self.metadata.get_by_name( + entity=DatabaseSchema, fqn=database_schema_fqn ) columns: List[Column] = [] @@ -291,20 +319,20 @@ class AmundsenSource(Source[Entity]): for tag in table["tags"] ] ) - table_entity = Table( - id=uuid.uuid4(), + table_request = CreateTableRequest( name=table["name"], tableType="Regular", description=table["description"], + databaseSchema=EntityReference( + id=database_schema_object.id, type="databaseSchema" + ), tags=tags, columns=columns, ) - table_and_db = OMetaDatabaseAndTable( - table=table_entity, database=database, database_schema=database_schema - ) + yield table_request + self.status.scanned(table["name"]) - yield table_and_db except Exception as e: logger.debug(traceback.format_exc()) logger.error(f"Failed to create table entity, due to {e}") diff --git a/ingestion/src/metadata/ingestion/source/metadata/atlas.py b/ingestion/src/metadata/ingestion/source/metadata/atlas.py index d34136a631c..7b029f74de1 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/atlas.py +++ b/ingestion/src/metadata/ingestion/source/metadata/atlas.py @@ -1,12 +1,16 @@ import traceback -import uuid -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List import yaml from metadata.clients.atlas_client import AtlasClient +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database @@ -36,10 +40,9 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -@dataclass class AtlasSourceStatus(SourceStatus): - tables_scanned: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) + tables_scanned: List[str] = list() + filtered: List[str] = list() def table_scanned(self, table: str) -> None: self.tables_scanned.append(table) @@ -160,41 +163,55 @@ class AtlasSource(Source): db_entity = tbl_entity["relationshipAttributes"][ self.service_connection.entityTypes["Table"][name]["db"] ] - db = self.get_database_entity(db_entity["displayText"]) + database_request = self.get_database_entity( + db_entity["displayText"] + ) table_name = tbl_attrs["name"] tbl_description = tbl_attrs["description"] + yield database_request + + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.service_connection.dbService, + database_name=database_request.name.__root__, + ) + + database_object = self.metadata.get_by_name( + entity=Database, fqn=database_fqn + ) tbl_attrs = tbl_entity["attributes"] db_entity = tbl_entity["relationshipAttributes"]["db"] - fqn_obj = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.config.serviceName, - database_name=db.name.__root__, - schema_name=db_entity["displayText"], - table_name=table_name, - ) - om_table_entity = Table( - id=uuid.uuid4(), - name=table_name, - description=tbl_description, - fullyQualifiedName=fqn_obj, - columns=tbl_columns, - ) - database_schema = DatabaseSchema( + + database_schema_request = CreateDatabaseSchemaRequest( name=db_entity["displayText"], - service=EntityReference( - id=self.service.id, type=self.service_connection.serviceType + database=EntityReference( + id=database_object.id, type="database" ), - database=EntityReference(id=db.id.__root__, type="database"), + ) + yield database_schema_request + + database_schema_fqn = fqn.build( + self.metadata, + entity_type=DatabaseSchema, + service_name=self.config.serviceConnection.__root__.config.dbService, + database_name=database_request.name.__root__, + schema_name=database_schema_request.name.__root__, + ) + database_schema_object = self.metadata.get_by_name( + entity=DatabaseSchema, fqn=database_schema_fqn ) - table_and_db = OMetaDatabaseAndTable( - table=om_table_entity, - database=db, - database_schema=database_schema, + table_request = CreateTableRequest( + name=table_name, + databaseSchema=EntityReference( + id=database_schema_object.id, type="databaseSchema" + ), + description=tbl_description, + columns=tbl_columns, ) - yield table_and_db + yield table_request yield from self.ingest_lineage(tbl_entity["guid"], name) @@ -233,12 +250,9 @@ class AtlasSource(Source): return om_cols def get_database_entity(self, database_name: str) -> Database: - return Database( - id=uuid.uuid4(), + return CreateDatabaseRequest( name=database_name, - service=EntityReference( - id=self.service.id, type=self.service_connection.serviceType - ), + service=EntityReference(id=self.service.id, type="databaseService"), ) def ingest_lineage(self, source_guid, name) -> Iterable[AddLineageRequest]: diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata.py index 6706d5ddf57..a9d5e6ec96e 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata.py @@ -10,7 +10,6 @@ # limitations under the License. """Metadata source module""" -from dataclasses import dataclass, field from typing import Iterable, List from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -41,12 +40,11 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -@dataclass class MetadataSourceStatus(SourceStatus): - success: List[str] = field(default_factory=list) - failures: List[str] = field(default_factory=list) - warnings: List[str] = field(default_factory=list) + success: List[str] = list() + failures: List[str] = list() + warnings: List[str] = list() def scanned_entity(self, entity_class_name: str, entity_name: str) -> None: self.success.append(entity_name)