WIP: Fixed amundsen ingestion (#8544)

This commit is contained in:
NiharDoshi99 2022-11-09 14:21:44 +05:30 committed by GitHub
parent 3c6e467829
commit 4ed088f55e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 326 additions and 7 deletions

View File

@ -28,6 +28,9 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
@ -64,6 +67,7 @@ from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.amundsen_helper import SERVICE_TYPE_MAPPER
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import (
@ -259,21 +263,24 @@ class AmundsenSource(Source[Entity]):
def _yield_create_database(self, table):
try:
service_entity = self.get_database_service(table["database"])
table_name = ""
if hasattr(service_entity.connection.config, "supportsDatabase"):
table_name = table["cluster"]
else:
table_name = "default"
database_request = CreateDatabaseRequest(
name=table["cluster"]
name=table_name
if hasattr(service_entity.connection.config, "supportsDatabase")
else "default",
service=EntityReference(id=service_entity.id, type="databaseService"),
)
yield database_request
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=table["database"],
database_name=table["cluster"],
database_name=table_name,
)
self.database_object = self.metadata.get_by_name(
@ -285,17 +292,17 @@ class AmundsenSource(Source[Entity]):
def _yield_create_database_schema(self, table):
try:
database_schema_request = CreateDatabaseSchemaRequest(
name=table["schema"],
database=EntityReference(id=self.database_object.id, type="database"),
)
yield database_schema_request
database_schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=table["database"],
database_name=table["cluster"],
database_name=self.database_object.name.__root__,
schema_name=database_schema_request.name.__root__,
)
@ -495,7 +502,22 @@ class AmundsenSource(Source[Entity]):
return data_type
def get_database_service(self, service_name: str) -> DatabaseService:
service = self.metadata.get_by_name(entity=DatabaseService, fqn=service_name)
"""
Method to get and create Database Service
"""
service = self.metadata.create_or_update(
CreateDatabaseServiceRequest(
name=service_name,
displayName=service_name,
connection=SERVICE_TYPE_MAPPER.get(
service_name, SERVICE_TYPE_MAPPER["mysql"]["connection"]
)["connection"],
serviceType=SERVICE_TYPE_MAPPER.get(
service_name, SERVICE_TYPE_MAPPER["mysql"]["service_name"]
)["service_name"],
),
)
if service is not None:
return service
logger.error(f"Please create a service with name {service_name}")

View File

@ -0,0 +1,89 @@
"""
Amundsen helper file for service mapping
"""
SERVICE_TYPE_MAPPER = {
"hive": {
"service_name": "Hive",
"connection": {"config": {"hostPort": "http://nohost:6000", "type": "Hive"}},
},
"delta": {
"service_name": "DeltaLake",
"connection": {
"config": {
"metastoreConnection": {"metastoreHostPort": "http://localhost:9083"}
}
},
},
"dynamo": {
"service_name": "DynamoDB",
"connection": {
"config": {"awsConfig": {"awsRegion": "aws_region"}, "type": "DynamoDB"}
},
},
"mysql": {
"service_name": "Mysql",
"connection": {
"config": {"hostPort": "http://nohost:6000", "username": "randomName"}
},
},
"athena": {
"service_name": "Athena",
"connection": {
"config": {
"s3StagingDir": "s3 staging dir",
"awsConfig": "aws_config",
"workgroup": "work_group",
}
},
},
"bigquery": {
"service_name": "BigQuery",
"connection": {"config": {"credentials": "credentials"}},
},
"db2": {
"service_name": "Db2",
"connection": {
"config": {"hostPort": "http://nohost:6000", "username": "username"}
},
},
"druid": {
"service_name": "Druid",
"connection": {"config": {"hostPort": "http://nohost:6000"}},
},
"salesforce": {
"service_name": "Salesforce",
"connection": {"config": {"username": "randomName"}},
},
"oracle": {
"service_name": "Oracle",
"connection": {
"config": {
"hostPort": "http://nohost:6000",
"username": "randomName",
"oracleConnectionType": {
"oracleServiceName": {"title": "orcale_ser_name"}
},
}
},
},
"glue": {
"service_name": "Glue",
"connection": {
"config": {
"awsConfig": "aws_config",
"storageServiceName": "glue_stroage_name",
}
},
},
"snowflake": {
"service_nmae": "Snowflake",
"connection": {
"config": {
"username": "randomName",
"account": "snow_fl_acco",
"warehouse": "compute",
}
},
},
}

View File

@ -0,0 +1,208 @@
"""
TestCase for Amundsen using the topology
"""
import datetime
from unittest import TestCase
from unittest.mock import patch
from pydantic import AnyUrl
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeConnection,
MetastoreHostPortConnection,
)
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
HiveConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
from metadata.generated.schema.type.basic import Href
from metadata.ingestion.source.metadata.amundsen import AmundsenSource
mock_amundsen_config = {
"source": {
"type": "amundsen",
"serviceName": "local_amundsen",
"serviceConnection": {
"config": {
"type": "Amundsen",
"username": "neo4j",
"password": "test",
"hostPort": "bolt://192.168.1.8:7687",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
EXPECTED_SERVICE = [
DatabaseService(
id="05f98ea5-1a30-480c-9bfc-55d1eabc45c7",
name="hive",
fullyQualifiedName="hive",
displayName="hive",
serviceType="Hive",
description=None,
connection=DatabaseConnection(
config=HiveConnection(
type="Hive",
scheme="hive",
username=None,
password=None,
hostPort="http://nohost:6000",
databaseSchema=None,
authOptions=None,
connectionOptions=None,
connectionArguments=None,
supportsMetadataExtraction=True,
supportsProfiler=True,
)
),
pipelines=None,
version=2.5,
updatedAt=1667892646744,
updatedBy="admin",
owner=None,
href=Href(
__root__=AnyUrl(
"http://localhost:8585/api/v1/services/databaseServices/05f98ea5-1a30-480c-9bfc-55d1eabc45c7",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/services/databaseServices/05f98ea5-1a30-480c-9bfc-55d1eabc45c7",
)
),
changeDescription=None,
deleted=False,
),
DatabaseService(
id="e856d239-4e74-4a7d-844b-d61c3e73b81d",
name="delta",
fullyQualifiedName="delta",
displayName="delta",
serviceType="DeltaLake",
description=None,
connection=DatabaseConnection(
config=DeltaLakeConnection(
type="DeltaLake",
metastoreConnection=MetastoreHostPortConnection(
metastoreHostPort="http://localhost:9083"
),
connectionArguments=None,
supportsMetadataExtraction=True,
)
),
pipelines=None,
version=2.5,
updatedAt=1667892646744,
updatedBy="admin",
owner=None,
href=Href(
__root__=AnyUrl(
"http://localhost:8585/api/v1/services/databaseServices/e856d239-4e74-4a7d-844b-d61c3e73b81d",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/services/databaseServices/e856d239-4e74-4a7d-844b-d61c3e73b81d",
)
),
changeDescription=None,
deleted=False,
),
DatabaseService(
id="836ff98d-a241-4d06-832d-745f96ac88fc",
name="dynamo",
fullyQualifiedName="dynamo",
displayName="dynamo",
serviceType="DynamoDB",
description=None,
connection=DatabaseConnection(
config=DynamoDBConnection(
type="DynamoDB",
awsConfig=AWSCredentials(awsRegion="aws_region"),
connectionArguments=None,
supportsMetadataExtraction=True,
supportsProfiler=True,
)
),
pipelines=None,
version=2.5,
updatedAt=1667892646744,
updatedBy="admin",
owner=None,
href=Href(
__root__=AnyUrl(
"http://localhost:8585/api/v1/services/databaseServices/836ff98d-a241-4d06-832d-745f96ac88fc",
scheme="http",
host="localhost",
host_type="int_domain",
port="8585",
path="/api/v1/services/databaseServices/836ff98d-a241-4d06-832d-745f96ac88fc",
)
),
changeDescription=None,
deleted=False,
),
]
SERVICE_NAME = ["hive", "delta", "dynamo"]
class AmundsenUnitTest(TestCase):
"""
Implements the necessary methods to extract
Amundsen Unit Test
"""
@patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection")
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_amundsen_config)
self.amundsen = AmundsenSource.create(
mock_amundsen_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
def test_database_service(self):
database_service_list = []
for service_name in SERVICE_NAME:
service_entity = self.amundsen.get_database_service(service_name)
database_service_list.append(service_entity)
for _, (expected, original) in enumerate(
zip(EXPECTED_SERVICE, database_service_list)
):
original.id = expected.id = "836ff98d-a241-4d06-832d-745f96ac88fc"
original.href = expected.href = None
original.updatedAt = expected.updatedAt = datetime.datetime.now()
original.version = expected.version = 2.5
original.changeDescription = None
self.assertEqual(expected, original)