From 4ed088f55e13db2ff028d454ddf520f8bc1088b4 Mon Sep 17 00:00:00 2001 From: NiharDoshi99 <51595473+NiharDoshi99@users.noreply.github.com> Date: Wed, 9 Nov 2022 14:21:44 +0530 Subject: [PATCH] WIP: Fixed amundsen ingestion (#8544) --- .../ingestion/source/metadata/amundsen.py | 36 ++- .../src/metadata/utils/amundsen_helper.py | 89 ++++++++ .../unit/topology/metadata/test_amundsen.py | 208 ++++++++++++++++++ 3 files changed, 326 insertions(+), 7 deletions(-) create mode 100644 ingestion/src/metadata/utils/amundsen_helper.py create mode 100644 ingestion/tests/unit/topology/metadata/test_amundsen.py diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index 1fe2a7a5656..248f89db93e 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -28,6 +28,9 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) from metadata.generated.schema.api.tags.createTag import CreateTagRequest from metadata.generated.schema.api.tags.createTagCategory import ( CreateTagCategoryRequest, @@ -64,6 +67,7 @@ from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn +from metadata.utils.amundsen_helper import SERVICE_TYPE_MAPPER from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import ( @@ -259,21 +263,24 @@ class AmundsenSource(Source[Entity]): def _yield_create_database(self, table): try: service_entity = self.get_database_service(table["database"]) + table_name = "" + if hasattr(service_entity.connection.config, "supportsDatabase"): + table_name = table["cluster"] + else: + table_name = "default" database_request = CreateDatabaseRequest( - name=table["cluster"] + name=table_name if hasattr(service_entity.connection.config, "supportsDatabase") else "default", service=EntityReference(id=service_entity.id, type="databaseService"), ) - yield database_request - database_fqn = fqn.build( self.metadata, entity_type=Database, service_name=table["database"], - database_name=table["cluster"], + database_name=table_name, ) self.database_object = self.metadata.get_by_name( @@ -285,17 +292,17 @@ class AmundsenSource(Source[Entity]): def _yield_create_database_schema(self, table): try: + database_schema_request = CreateDatabaseSchemaRequest( name=table["schema"], database=EntityReference(id=self.database_object.id, type="database"), ) yield database_schema_request - database_schema_fqn = fqn.build( self.metadata, entity_type=DatabaseSchema, service_name=table["database"], - database_name=table["cluster"], + database_name=self.database_object.name.__root__, schema_name=database_schema_request.name.__root__, ) @@ -495,7 +502,22 @@ class AmundsenSource(Source[Entity]): return data_type def get_database_service(self, service_name: str) -> DatabaseService: - service = self.metadata.get_by_name(entity=DatabaseService, fqn=service_name) + """ + Method to get and create Database Service + """ + service = self.metadata.create_or_update( + CreateDatabaseServiceRequest( + name=service_name, + displayName=service_name, + connection=SERVICE_TYPE_MAPPER.get( + service_name, SERVICE_TYPE_MAPPER["mysql"]["connection"] + )["connection"], + serviceType=SERVICE_TYPE_MAPPER.get( + service_name, SERVICE_TYPE_MAPPER["mysql"]["service_name"] + )["service_name"], + ), + ) + if service is not None: return service logger.error(f"Please create a service with name {service_name}") diff --git a/ingestion/src/metadata/utils/amundsen_helper.py b/ingestion/src/metadata/utils/amundsen_helper.py new file mode 100644 index 00000000000..a72b4b875ac --- /dev/null +++ b/ingestion/src/metadata/utils/amundsen_helper.py @@ -0,0 +1,89 @@ +""" +Amundsen helper file for service mapping +""" + +SERVICE_TYPE_MAPPER = { + "hive": { + "service_name": "Hive", + "connection": {"config": {"hostPort": "http://nohost:6000", "type": "Hive"}}, + }, + "delta": { + "service_name": "DeltaLake", + "connection": { + "config": { + "metastoreConnection": {"metastoreHostPort": "http://localhost:9083"} + } + }, + }, + "dynamo": { + "service_name": "DynamoDB", + "connection": { + "config": {"awsConfig": {"awsRegion": "aws_region"}, "type": "DynamoDB"} + }, + }, + "mysql": { + "service_name": "Mysql", + "connection": { + "config": {"hostPort": "http://nohost:6000", "username": "randomName"} + }, + }, + "athena": { + "service_name": "Athena", + "connection": { + "config": { + "s3StagingDir": "s3 staging dir", + "awsConfig": "aws_config", + "workgroup": "work_group", + } + }, + }, + "bigquery": { + "service_name": "BigQuery", + "connection": {"config": {"credentials": "credentials"}}, + }, + "db2": { + "service_name": "Db2", + "connection": { + "config": {"hostPort": "http://nohost:6000", "username": "username"} + }, + }, + "druid": { + "service_name": "Druid", + "connection": {"config": {"hostPort": "http://nohost:6000"}}, + }, + "salesforce": { + "service_name": "Salesforce", + "connection": {"config": {"username": "randomName"}}, + }, + "oracle": { + "service_name": "Oracle", + "connection": { + "config": { + "hostPort": "http://nohost:6000", + "username": "randomName", + "oracleConnectionType": { + "oracleServiceName": {"title": "orcale_ser_name"} + }, + } + }, + }, + "glue": { + "service_name": "Glue", + "connection": { + "config": { + "awsConfig": "aws_config", + "storageServiceName": "glue_stroage_name", + } + }, + }, + "snowflake": { + "service_nmae": "Snowflake", + "connection": { + "config": { + "username": "randomName", + "account": "snow_fl_acco", + "warehouse": "compute", + } + }, + }, +} diff --git a/ingestion/tests/unit/topology/metadata/test_amundsen.py b/ingestion/tests/unit/topology/metadata/test_amundsen.py new file mode 100644 index 00000000000..6f09389e8c7 --- /dev/null +++ b/ingestion/tests/unit/topology/metadata/test_amundsen.py @@ -0,0 +1,208 @@ +""" +TestCase for Amundsen using the topology +""" + +import datetime +from unittest import TestCase +from unittest.mock import patch + +from pydantic import AnyUrl + +from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( + DeltaLakeConnection, + MetastoreHostPortConnection, +) +from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( + DynamoDBConnection, +) +from metadata.generated.schema.entity.services.connections.database.hiveConnection import ( + HiveConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials +from metadata.generated.schema.type.basic import Href +from metadata.ingestion.source.metadata.amundsen import AmundsenSource + +mock_amundsen_config = { + "source": { + "type": "amundsen", + "serviceName": "local_amundsen", + "serviceConnection": { + "config": { + "type": "Amundsen", + "username": "neo4j", + "password": "test", + "hostPort": "bolt://192.168.1.8:7687", + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +EXPECTED_SERVICE = [ + DatabaseService( + id="05f98ea5-1a30-480c-9bfc-55d1eabc45c7", + name="hive", + fullyQualifiedName="hive", + displayName="hive", + serviceType="Hive", + description=None, + connection=DatabaseConnection( + config=HiveConnection( + type="Hive", + scheme="hive", + username=None, + password=None, + hostPort="http://nohost:6000", + databaseSchema=None, + authOptions=None, + connectionOptions=None, + connectionArguments=None, + supportsMetadataExtraction=True, + supportsProfiler=True, + ) + ), + pipelines=None, + version=2.5, + updatedAt=1667892646744, + updatedBy="admin", + owner=None, + href=Href( + __root__=AnyUrl( + "http://localhost:8585/api/v1/services/databaseServices/05f98ea5-1a30-480c-9bfc-55d1eabc45c7", + scheme="http", + host="localhost", + host_type="int_domain", + port="8585", + path="/api/v1/services/databaseServices/05f98ea5-1a30-480c-9bfc-55d1eabc45c7", + ) + ), + changeDescription=None, + deleted=False, + ), + DatabaseService( + id="e856d239-4e74-4a7d-844b-d61c3e73b81d", + name="delta", + fullyQualifiedName="delta", + displayName="delta", + serviceType="DeltaLake", + description=None, + connection=DatabaseConnection( + config=DeltaLakeConnection( + type="DeltaLake", + metastoreConnection=MetastoreHostPortConnection( + metastoreHostPort="http://localhost:9083" + ), + connectionArguments=None, + supportsMetadataExtraction=True, + ) + ), + pipelines=None, + version=2.5, + updatedAt=1667892646744, + updatedBy="admin", + owner=None, + href=Href( + __root__=AnyUrl( + "http://localhost:8585/api/v1/services/databaseServices/e856d239-4e74-4a7d-844b-d61c3e73b81d", + scheme="http", + host="localhost", + host_type="int_domain", + port="8585", + path="/api/v1/services/databaseServices/e856d239-4e74-4a7d-844b-d61c3e73b81d", + ) + ), + changeDescription=None, + deleted=False, + ), + DatabaseService( + id="836ff98d-a241-4d06-832d-745f96ac88fc", + name="dynamo", + fullyQualifiedName="dynamo", + displayName="dynamo", + serviceType="DynamoDB", + description=None, + connection=DatabaseConnection( + config=DynamoDBConnection( + type="DynamoDB", + awsConfig=AWSCredentials(awsRegion="aws_region"), + connectionArguments=None, + supportsMetadataExtraction=True, + supportsProfiler=True, + ) + ), + pipelines=None, + version=2.5, + updatedAt=1667892646744, + updatedBy="admin", + owner=None, + href=Href( + __root__=AnyUrl( + "http://localhost:8585/api/v1/services/databaseServices/836ff98d-a241-4d06-832d-745f96ac88fc", + scheme="http", + host="localhost", + host_type="int_domain", + port="8585", + path="/api/v1/services/databaseServices/836ff98d-a241-4d06-832d-745f96ac88fc", + ) + ), + changeDescription=None, + deleted=False, + ), +] + +SERVICE_NAME = ["hive", "delta", "dynamo"] + + +class AmundsenUnitTest(TestCase): + """ + Implements the necessary methods to extract + Amundsen Unit Test + """ + + @patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection") + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(mock_amundsen_config) + self.amundsen = AmundsenSource.create( + mock_amundsen_config["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + + def test_database_service(self): + database_service_list = [] + for service_name in SERVICE_NAME: + service_entity = self.amundsen.get_database_service(service_name) + database_service_list.append(service_entity) + + for _, (expected, original) in enumerate( + zip(EXPECTED_SERVICE, database_service_list) + ): + original.id = expected.id = "836ff98d-a241-4d06-832d-745f96ac88fc" + original.href = expected.href = None + original.updatedAt = expected.updatedAt = datetime.datetime.now() + original.version = expected.version = 2.5 + original.changeDescription = None + self.assertEqual(expected, original)