Fix #4722 - Validate workflowConfig when parsing error (#4807)

* Update parser

* Check if tests in config

* Add provider class map

* Validate server config
This commit is contained in:
Pere Miquel Brull 2022-05-10 07:48:52 +02:00 committed by GitHub
parent 6fdd1c4691
commit eee71223b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 15 deletions

View File

@ -34,7 +34,9 @@ from metadata.generated.schema.entity.services.metadataService import (
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
WorkflowConfig,
)
from metadata.ingestion.ometa.provider_registry import PROVIDER_CLASS_MAP
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -99,6 +101,57 @@ def get_connection_class(
return connection_class
def parse_workflow_source(config_dict: dict) -> None:
"""
Validate the parsing of the source in the config dict.
This is our first stop as most issues happen when
passing the source information.
:param config_dict: JSON configuration
"""
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
source_type = config_dict["source"]["serviceConnection"]["config"]["type"]
logger.error(
f"Error parsing the Workflow Configuration for {source_type} ingestion"
)
service_type = get_service_type(source_type)
connection_class = get_connection_class(source_type, service_type)
# Parse the dictionary with the scoped class
connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"])
def parse_server_config(config_dict: dict) -> None:
"""
Validate the parsing of openMetadataServerConfig.
This is valuable to make sure there are no issues
when setting up auth providers.
:param config_dict: JSON configuration
"""
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
auth_provider = config_dict["workflowConfig"]["openMetadataServerConfig"][
"authProvider"
]
logger.error(
f"Error parsing the Workflow Server Configuration with {auth_provider} auth provider"
)
# If the error comes from the security config:
auth_class = PROVIDER_CLASS_MAP.get(auth_provider)
security_config = (
config_dict.get("workflowConfig")
.get("openMetadataServerConfig")
.get("securityConfig")
)
if auth_class and security_config:
auth_class.parse_obj(security_config)
# If the security config is properly configured, let's raise the ValidationError of the whole WorkflowConfig
WorkflowConfig.parse_obj(config_dict["workflowConfig"])
def parse_workflow_config_gracefully(
config_dict: dict,
) -> Optional[OpenMetadataWorkflowConfig]:
@ -113,19 +166,8 @@ def parse_workflow_config_gracefully(
try:
workflow_config = OpenMetadataWorkflowConfig.parse_obj(config_dict)
return workflow_config
except ValidationError:
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
source_type = config_dict["source"]["serviceConnection"]["config"]["type"]
logger.error(
f"Error parsing the Workflow Configuration for {source_type} ingestion"
)
service_type = get_service_type(source_type)
connection_class = get_connection_class(source_type, service_type)
# Parse the dictionary with the scoped class
connection_class.parse_obj(config_dict["source"]["serviceConnection"]["config"])
parse_workflow_source(config_dict)
parse_server_config(config_dict)

View File

@ -15,6 +15,21 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
AuthProvider,
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.auth0SSOClientConfig import (
Auth0SSOClientConfig,
)
from metadata.generated.schema.security.client.azureSSOClientConfig import (
AzureSSOClientConfig,
)
from metadata.generated.schema.security.client.customOidcSSOClientConfig import (
CustomOIDCSSOClientConfig,
)
from metadata.generated.schema.security.client.googleSSOClientConfig import (
GoogleSSOClientConfig,
)
from metadata.generated.schema.security.client.oktaSSOClientConfig import (
OktaSSOClientConfig,
)
from metadata.ingestion.ometa.auth_provider import (
Auth0AuthenticationProvider,
AuthenticationProvider,
@ -24,6 +39,7 @@ from metadata.ingestion.ometa.auth_provider import (
NoOpAuthenticationProvider,
OktaAuthenticationProvider,
OpenMetadataAuthenticationProvider,
OpenMetadataJWTClientConfig,
)
from metadata.utils.dispatch import enum_register
@ -71,3 +87,14 @@ def custom_oidc_auth_init(config: OpenMetadataConnection) -> AuthenticationProvi
@auth_provider_registry.add(AuthProvider.openmetadata.value)
def om_auth_init(config: OpenMetadataConnection) -> AuthenticationProvider:
return OpenMetadataAuthenticationProvider.create(config)
PROVIDER_CLASS_MAP = {
AuthProvider.no_auth.value: None,
AuthProvider.google.value: GoogleSSOClientConfig,
AuthProvider.azure.value: AzureSSOClientConfig,
AuthProvider.okta.value: OktaSSOClientConfig,
AuthProvider.auth0.value: Auth0SSOClientConfig,
AuthProvider.custom_oidc.value: CustomOIDCSSOClientConfig,
AuthProvider.openmetadata.value: OpenMetadataJWTClientConfig,
}

View File

@ -320,7 +320,10 @@ class OrmProfilerProcessor(Processor[Table]):
return None
# Compute all validations against the profiler results
for table_test in my_record_tests.table_tests:
table_tests = my_record_tests.table_tests or []
column_tests = my_record_tests.column_tests or []
for table_test in table_tests:
test_case_result = self.run_table_test(
table=table,
orm_table=orm_table,
@ -330,7 +333,7 @@ class OrmProfilerProcessor(Processor[Table]):
if test_case_result:
table_test.result = test_case_result
for column_test in my_record_tests.column_tests:
for column_test in column_tests:
test_case_result = self.run_column_test(
table=table,
orm_table=orm_table,