diff --git a/ingestion/src/metadata/ingestion/api/parser.py b/ingestion/src/metadata/ingestion/api/parser.py index aa4c0d33b43..128a87ce5ca 100644 --- a/ingestion/src/metadata/ingestion/api/parser.py +++ b/ingestion/src/metadata/ingestion/api/parser.py @@ -34,7 +34,9 @@ from metadata.generated.schema.entity.services.metadataService import ( ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, + WorkflowConfig, ) +from metadata.ingestion.ometa.provider_registry import PROVIDER_CLASS_MAP from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -99,6 +101,57 @@ def get_connection_class( return connection_class +def parse_workflow_source(config_dict: dict) -> None: + """ + Validate the parsing of the source in the config dict. + This is our first stop as most issues happen when + passing the source information. + + :param config_dict: JSON configuration + """ + # Unsafe access to the keys. Allow a KeyError if the config is not well formatted + source_type = config_dict["source"]["serviceConnection"]["config"]["type"] + logger.error( + f"Error parsing the Workflow Configuration for {source_type} ingestion" + ) + + service_type = get_service_type(source_type) + connection_class = get_connection_class(source_type, service_type) + + # Parse the dictionary with the scoped class + connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"]) + + +def parse_server_config(config_dict: dict) -> None: + """ + Validate the parsing of openMetadataServerConfig. + This is valuable to make sure there are no issues + when setting up auth providers. + + :param config_dict: JSON configuration + """ + # Unsafe access to the keys. Allow a KeyError if the config is not well formatted + auth_provider = config_dict["workflowConfig"]["openMetadataServerConfig"][ + "authProvider" + ] + logger.error( + f"Error parsing the Workflow Server Configuration with {auth_provider} auth provider" + ) + + # If the error comes from the security config: + auth_class = PROVIDER_CLASS_MAP.get(auth_provider) + security_config = ( + config_dict.get("workflowConfig") + .get("openMetadataServerConfig") + .get("securityConfig") + ) + if auth_class and security_config: + auth_class.parse_obj(security_config) + + # If the security config is properly configured, let's raise the ValidationError of the whole WorkflowConfig + WorkflowConfig.parse_obj(config_dict["workflowConfig"]) + + def parse_workflow_config_gracefully( config_dict: dict, ) -> Optional[OpenMetadataWorkflowConfig]: @@ -113,19 +166,8 @@ def parse_workflow_config_gracefully( try: workflow_config = OpenMetadataWorkflowConfig.parse_obj(config_dict) - return workflow_config except ValidationError: - - # Unsafe access to the keys. Allow a KeyError if the config is not well formatted - source_type = config_dict["source"]["serviceConnection"]["config"]["type"] - logger.error( - f"Error parsing the Workflow Configuration for {source_type} ingestion" - ) - - service_type = get_service_type(source_type) - connection_class = get_connection_class(source_type, service_type) - - # Parse the dictionary with the scoped class - connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"]) + parse_workflow_source(config_dict) + parse_server_config(config_dict) diff --git a/ingestion/src/metadata/ingestion/ometa/provider_registry.py b/ingestion/src/metadata/ingestion/ometa/provider_registry.py index e432067ed00..0db0cac7b3b 100644 --- a/ingestion/src/metadata/ingestion/ometa/provider_registry.py +++ b/ingestion/src/metadata/ingestion/ometa/provider_registry.py @@ -15,6 +15,21 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata AuthProvider, OpenMetadataConnection, ) +from metadata.generated.schema.security.client.auth0SSOClientConfig import ( + Auth0SSOClientConfig, +) +from metadata.generated.schema.security.client.azureSSOClientConfig import ( + AzureSSOClientConfig, +) +from metadata.generated.schema.security.client.customOidcSSOClientConfig import ( + CustomOIDCSSOClientConfig, +) +from metadata.generated.schema.security.client.googleSSOClientConfig import ( + GoogleSSOClientConfig, +) +from metadata.generated.schema.security.client.oktaSSOClientConfig import ( + OktaSSOClientConfig, +) from metadata.ingestion.ometa.auth_provider import ( Auth0AuthenticationProvider, AuthenticationProvider, @@ -24,6 +39,7 @@ from metadata.ingestion.ometa.auth_provider import ( NoOpAuthenticationProvider, OktaAuthenticationProvider, OpenMetadataAuthenticationProvider, + OpenMetadataJWTClientConfig, ) from metadata.utils.dispatch import enum_register @@ -71,3 +87,14 @@ def custom_oidc_auth_init(config: OpenMetadataConnection) -> AuthenticationProvi @auth_provider_registry.add(AuthProvider.openmetadata.value) def om_auth_init(config: OpenMetadataConnection) -> AuthenticationProvider: return OpenMetadataAuthenticationProvider.create(config) + + +PROVIDER_CLASS_MAP = { + AuthProvider.no_auth.value: None, + AuthProvider.google.value: GoogleSSOClientConfig, + AuthProvider.azure.value: AzureSSOClientConfig, + AuthProvider.okta.value: OktaSSOClientConfig, + AuthProvider.auth0.value: Auth0SSOClientConfig, + AuthProvider.custom_oidc.value: CustomOIDCSSOClientConfig, + AuthProvider.openmetadata.value: OpenMetadataJWTClientConfig, +} diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index e3ebc04db90..1badb6073b0 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -320,7 +320,10 @@ class OrmProfilerProcessor(Processor[Table]): return None # Compute all validations against the profiler results - for table_test in my_record_tests.table_tests: + table_tests = my_record_tests.table_tests or [] + column_tests = my_record_tests.column_tests or [] + + for table_test in table_tests: test_case_result = self.run_table_test( table=table, orm_table=orm_table, @@ -330,7 +333,7 @@ class OrmProfilerProcessor(Processor[Table]): if test_case_result: table_test.result = test_case_result - for column_test in my_record_tests.column_tests: + for column_test in column_tests: test_case_result = self.run_column_test( table=table, orm_table=orm_table,