diff --git a/ingestion/src/metadata/ingestion/api/parser.py b/ingestion/src/metadata/ingestion/api/parser.py index b9887bf2582..ddf43e2c4d5 100644 --- a/ingestion/src/metadata/ingestion/api/parser.py +++ b/ingestion/src/metadata/ingestion/api/parser.py @@ -43,8 +43,37 @@ from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineServiceType, ) +from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( + DashboardMetadataConfigType, + DashboardServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseMetadataConfigType, + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, + ProfilerConfigType, +) +from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import ( + DatabaseServiceQueryUsagePipeline, + DatabaseUsageConfigType, +) +from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import ( + MessagingMetadataConfigType, + MessagingServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.mlmodelServiceMetadataPipeline import ( + MlModelMetadataConfigType, + MlModelServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + PipelineMetadataConfigType, + PipelineServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, + SourceConfig, WorkflowConfig, ) from metadata.ingestion.ometa.provider_registry import PROVIDER_CLASS_MAP @@ -84,6 +113,39 @@ def get_service_type( raise ValueError(f"Cannot find the service type of {source_type}") +def get_source_config_class( + source_config_type: str, +) -> Union[ + Type[DatabaseMetadataConfigType], + Type[ProfilerConfigType], + Type[DatabaseUsageConfigType], + Type[DashboardMetadataConfigType], + Type[MessagingMetadataConfigType], + Type[MlModelMetadataConfigType], + Type[PipelineMetadataConfigType], +]: + """ + Return the source config type for a source string + :param source_config_type: source config type string + :return: source config class + """ + if source_config_type == DashboardMetadataConfigType.DashboardMetadata.value: + return DashboardServiceMetadataPipeline + if source_config_type == ProfilerConfigType.Profiler.value: + return DatabaseServiceProfilerPipeline + if source_config_type == DatabaseUsageConfigType.DatabaseUsage.value: + return DatabaseServiceQueryUsagePipeline + if source_config_type == MessagingMetadataConfigType.MessagingMetadata.value: + return MessagingServiceMetadataPipeline + if source_config_type == PipelineMetadataConfigType.PipelineMetadata.value: + return PipelineServiceMetadataPipeline + if source_config_type == MlModelMetadataConfigType.MlModelMetadata.value: + return MlModelServiceMetadataPipeline + if source_config_type == DatabaseMetadataConfigType.DatabaseMetadata.value: + return DatabaseServiceMetadataPipeline + raise ValueError(f"Cannot find the service type of {source_config_type}") + + def get_connection_class( source_type: str, service_type: Union[ @@ -138,6 +200,11 @@ def parse_workflow_source(config_dict: dict) -> None: # Parse the dictionary with the scoped class connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"]) + # Parse the source config + source_config_type = config_dict["source"]["sourceConfig"]["config"]["type"] + source_config_class = get_source_config_class(source_config_type) + source_config_class.parse_obj(config_dict["source"]["sourceConfig"]["config"]) + def parse_server_config(config_dict: dict) -> None: """ diff --git a/ingestion/tests/unit/test_workflow_parse.py b/ingestion/tests/unit/test_workflow_parse.py index 5ec7ada9ecf..81fd6fe0dbf 100644 --- a/ingestion/tests/unit/test_workflow_parse.py +++ b/ingestion/tests/unit/test_workflow_parse.py @@ -39,9 +39,22 @@ from metadata.generated.schema.entity.services.messagingService import ( MessagingConnection, ) from metadata.generated.schema.entity.services.metadataService import MetadataConnection +from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( + DashboardServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + PipelineServiceMetadataPipeline, +) from metadata.ingestion.api.parser import ( get_connection_class, get_service_type, + get_source_config_class, parse_test_connection_request_gracefully, parse_workflow_config_gracefully, ) @@ -94,6 +107,26 @@ class TestWorkflowParse(TestCase): connection = get_connection_class(source_type, get_service_type(source_type)) self.assertEqual(connection, PulsarConnection) + def test_get_source_config_class(self): + """ + Check that we can correctly build the connection module ingredients + """ + source_config_type = "Profiler" + connection = get_source_config_class(source_config_type) + self.assertEqual(connection, DatabaseServiceProfilerPipeline) + + source_config_type = "DatabaseMetadata" + connection = get_source_config_class(source_config_type) + self.assertEqual(connection, DatabaseServiceMetadataPipeline) + + source_config_type = "PipelineMetadata" + connection = get_source_config_class(source_config_type) + self.assertEqual(connection, PipelineServiceMetadataPipeline) + + source_config_type = "DashboardMetadata" + connection = get_source_config_class(source_config_type) + self.assertEqual(connection, DashboardServiceMetadataPipeline) + def test_parsing_ok(self): """ Test MSSQL JSON Config parsing OK @@ -162,6 +195,46 @@ class TestWorkflowParse(TestCase): self.assertIn("1 validation error for MssqlConnection", str(err.exception)) + def test_parsing_ko_mssql_source_config(self): + """ + Test MSSQL JSON Config parsing KO + """ + + config_dict = { + "source": { + "type": "mssql", + "serviceName": "test_mssql", + "serviceConnection": { + "config": { + "type": "Mssql", + "database": "master", + "username": "sa", + "password": "MY%password", + "hostPort": "localhost:1433", + } + }, + "sourceConfig": { + "config": {"type": "DatabaseMetadata", "random": "extra"} + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "loggerLevel": "WARN", + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth", + }, + }, + } + + with self.assertRaises(ValidationError) as err: + parse_workflow_config_gracefully(config_dict) + + self.assertIn( + "1 validation error for DatabaseServiceMetadataPipeline\nrandom\n extra fields not permitted (type=value_error.extra)", + str(err.exception), + ) + def test_parsing_ko_glue(self): """ Test Glue JSON Config parsing OK @@ -202,6 +275,39 @@ class TestWorkflowParse(TestCase): str(err.exception), ) + def test_parsing_ko_airbyte(self): + """ + Test Glue JSON Config parsing OK + """ + + config_dict = { + "source": { + "type": "airbyte", + "serviceName": "local_airbyte", + "serviceConnection": { + "config": {"type": "Airbyte", "hostPort": "http://localhost:8000"} + }, + "sourceConfig": { + "config": {"type": "PipelineMetadata", "random": "extra"} + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth", + } + }, + } + + with self.assertRaises(ValidationError) as err: + parse_workflow_config_gracefully(config_dict) + + self.assertIn( + "1 validation error for PipelineServiceMetadataPipeline\nrandom\n extra fields not permitted (type=value_error.extra)", + str(err.exception), + ) + def test_test_connection_mysql(self): """ Test the TestConnection for MySQL