Fix #5847: Handled Exception For Source Config (#6386)

* Fix #5847: Handled Exception For Source Config

* Comment Fix

* Added Test Cases
This commit is contained in:
Mayur Singal 2022-07-27 23:51:16 +05:30 committed by GitHub
parent 1e3ddfc111
commit 9c7d8f7d8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 173 additions and 0 deletions

View File

@ -43,8 +43,37 @@ from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardMetadataConfigType,
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseMetadataConfigType,
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
ProfilerConfigType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import (
DatabaseServiceQueryUsagePipeline,
DatabaseUsageConfigType,
)
from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import (
MessagingMetadataConfigType,
MessagingServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.mlmodelServiceMetadataPipeline import (
MlModelMetadataConfigType,
MlModelServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineMetadataConfigType,
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
SourceConfig,
WorkflowConfig,
)
from metadata.ingestion.ometa.provider_registry import PROVIDER_CLASS_MAP
@ -84,6 +113,39 @@ def get_service_type(
raise ValueError(f"Cannot find the service type of {source_type}")
def get_source_config_class(
source_config_type: str,
) -> Union[
Type[DatabaseMetadataConfigType],
Type[ProfilerConfigType],
Type[DatabaseUsageConfigType],
Type[DashboardMetadataConfigType],
Type[MessagingMetadataConfigType],
Type[MlModelMetadataConfigType],
Type[PipelineMetadataConfigType],
]:
"""
Return the source config type for a source string
:param source_config_type: source config type string
:return: source config class
"""
if source_config_type == DashboardMetadataConfigType.DashboardMetadata.value:
return DashboardServiceMetadataPipeline
if source_config_type == ProfilerConfigType.Profiler.value:
return DatabaseServiceProfilerPipeline
if source_config_type == DatabaseUsageConfigType.DatabaseUsage.value:
return DatabaseServiceQueryUsagePipeline
if source_config_type == MessagingMetadataConfigType.MessagingMetadata.value:
return MessagingServiceMetadataPipeline
if source_config_type == PipelineMetadataConfigType.PipelineMetadata.value:
return PipelineServiceMetadataPipeline
if source_config_type == MlModelMetadataConfigType.MlModelMetadata.value:
return MlModelServiceMetadataPipeline
if source_config_type == DatabaseMetadataConfigType.DatabaseMetadata.value:
return DatabaseServiceMetadataPipeline
raise ValueError(f"Cannot find the service type of {source_config_type}")
def get_connection_class(
source_type: str,
service_type: Union[
@ -138,6 +200,11 @@ def parse_workflow_source(config_dict: dict) -> None:
# Parse the dictionary with the scoped class
connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"])
# Parse the source config
source_config_type = config_dict["source"]["sourceConfig"]["config"]["type"]
source_config_class = get_source_config_class(source_config_type)
source_config_class.parse_obj(config_dict["source"]["sourceConfig"]["config"])
def parse_server_config(config_dict: dict) -> None:
"""

View File

@ -39,9 +39,22 @@ from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection,
)
from metadata.generated.schema.entity.services.metadataService import MetadataConnection
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.ingestion.api.parser import (
get_connection_class,
get_service_type,
get_source_config_class,
parse_test_connection_request_gracefully,
parse_workflow_config_gracefully,
)
@ -94,6 +107,26 @@ class TestWorkflowParse(TestCase):
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, PulsarConnection)
def test_get_source_config_class(self):
"""
Check that we can correctly build the connection module ingredients
"""
source_config_type = "Profiler"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, DatabaseServiceProfilerPipeline)
source_config_type = "DatabaseMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, DatabaseServiceMetadataPipeline)
source_config_type = "PipelineMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, PipelineServiceMetadataPipeline)
source_config_type = "DashboardMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, DashboardServiceMetadataPipeline)
def test_parsing_ok(self):
"""
Test MSSQL JSON Config parsing OK
@ -162,6 +195,46 @@ class TestWorkflowParse(TestCase):
self.assertIn("1 validation error for MssqlConnection", str(err.exception))
def test_parsing_ko_mssql_source_config(self):
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
"source": {
"type": "mssql",
"serviceName": "test_mssql",
"serviceConnection": {
"config": {
"type": "Mssql",
"database": "master",
"username": "sa",
"password": "MY%password",
"hostPort": "localhost:1433",
}
},
"sourceConfig": {
"config": {"type": "DatabaseMetadata", "random": "extra"}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "WARN",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
},
},
}
with self.assertRaises(ValidationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"1 validation error for DatabaseServiceMetadataPipeline\nrandom\n extra fields not permitted (type=value_error.extra)",
str(err.exception),
)
def test_parsing_ko_glue(self):
"""
Test Glue JSON Config parsing OK
@ -202,6 +275,39 @@ class TestWorkflowParse(TestCase):
str(err.exception),
)
def test_parsing_ko_airbyte(self):
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
"source": {
"type": "airbyte",
"serviceName": "local_airbyte",
"serviceConnection": {
"config": {"type": "Airbyte", "hostPort": "http://localhost:8000"}
},
"sourceConfig": {
"config": {"type": "PipelineMetadata", "random": "extra"}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
}
},
}
with self.assertRaises(ValidationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"1 validation error for PipelineServiceMetadataPipeline\nrandom\n extra fields not permitted (type=value_error.extra)",
str(err.exception),
)
def test_test_connection_mysql(self):
"""
Test the TestConnection for MySQL