From 1d93d0e6506f7e3d8fa41230d974e5299463b754 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 9 Nov 2022 13:00:22 +0530 Subject: [PATCH] Fix #8570: Parsing Workflow Gracefully on Deploy (#8585) * Fix #8570: Parsing Workflow Gracefully on Deploy * use entity_class * py format --- .../src/metadata/ingestion/api/parser.py | 10 +- .../openmetadata_managed_apis/utils/logger.py | 5 + .../openmetadata_managed_apis/utils/parser.py | 106 ++++++++++++++++++ .../workflows/ingestion/common.py | 84 ++++++++++---- 4 files changed, 185 insertions(+), 20 deletions(-) create mode 100644 openmetadata-airflow-apis/openmetadata_managed_apis/utils/parser.py diff --git a/ingestion/src/metadata/ingestion/api/parser.py b/ingestion/src/metadata/ingestion/api/parser.py index 1bf84734f62..07947ea6540 100644 --- a/ingestion/src/metadata/ingestion/api/parser.py +++ b/ingestion/src/metadata/ingestion/api/parser.py @@ -216,7 +216,15 @@ def _parse_validation_err(validation_error: ValidationError) -> str: if err.get("type") == "value_error.missing" ] - return "\t - " + "\n\t - ".join(missing_fields + extra_fields) + invalid_fields = [ + f"Invalid parameter value for '{err.get('loc')[0]}'" + if len(err.get("loc")) == 1 + else f"Invalid parameter value for {err.get('loc')}" + for err in validation_error.errors() + if err.get("type") not in ("value_error.missing", "value_error.extra") + ] + + return "\t - " + "\n\t - ".join(missing_fields + extra_fields + invalid_fields) def _unsafe_parse_config(config: dict, cls: T, message: str) -> None: diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py index 429a28d9dc1..2e8d6220475 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py @@ -14,6 +14,7 @@ class Loggers(Enum): API = "AirflowAPI" OPERATIONS = "AirflowOperations" WORKFLOW = "AirflowWorkflow" + UTILS = "AirflowUtils" def build_logger(logger_name: str) -> logging.Logger: @@ -44,3 +45,7 @@ def operations_logger(): def workflow_logger(): return build_logger(Loggers.WORKFLOW.value) + + +def utils_logger(): + return build_logger(Loggers.UTILS.value) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/parser.py b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/parser.py new file mode 100644 index 00000000000..76356701e3a --- /dev/null +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/parser.py @@ -0,0 +1,106 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module to parse source connecetion config, to handle validation error +""" + +from openmetadata_managed_apis.utils.logger import utils_logger +from pydantic import ValidationError + +from metadata.ingestion.api.parser import ( + HAS_INNER_CONNECTION, + InvalidWorkflowException, + _unsafe_parse_config, + get_connection_class, + get_service_type, +) + +logger = utils_logger() + + +def parse_validation_err(validation_error: ValidationError) -> str: + """ + Convert the validation error into a message to log + """ + missing_fields = [ + f"Extra parameter '{err.get('loc')[0]}'" + if len(err.get("loc")) == 1 + else f"Extra parameter in {err.get('loc')}" + for err in validation_error.errors() + if err.get("type") == "value_error.extra" + ] + + extra_fields = [ + f"Missing parameter '{err.get('loc')[0]}'" + if len(err.get("loc")) == 1 + else f"Missing parameter in {err.get('loc')}" + for err in validation_error.errors() + if err.get("type") == "value_error.missing" + ] + + invalid_fields = [ + f"Invalid parameter value for '{err.get('loc')[0]}'" + if len(err.get("loc")) == 1 + else f"Missing parameter in {err.get('loc')}" + for err in validation_error.errors() + if err.get("type") not in ("value_error.missing", "value_error.extra") + ] + + return "\n".join(missing_fields + extra_fields + invalid_fields) + + +def _parse_inner_connection(connection_dict: dict, source_type: str) -> None: + """ + Parse the inner connection of the flagged connectors + + :param config_dict: JSON configuration + :param source_type: source type name, e.g., Airflow. + """ + inner_source_type = connection_dict["connection"]["config"]["connection"]["type"] + inner_service_type = get_service_type(inner_source_type) + inner_connection_class = get_connection_class(inner_source_type, inner_service_type) + _unsafe_parse_config( + config=connection_dict["connection"]["config"]["connection"], + cls=inner_connection_class, + message=f"Error parsing the inner service connection for {source_type}", + ) + + +def parse_service_connection(connection_dict: dict) -> None: + """ + Parse the service connection and raise any scoped + errors during the validation process + + :param config_dict: JSON configuration + """ + # Unsafe access to the keys. Allow a KeyError if the config is not well formatted + source_type = connection_dict["connection"]["config"].get("type") + if source_type is None: + raise InvalidWorkflowException("Missing type in the serviceConnection config") + + logger.debug( + f"Error parsing the Workflow Configuration for {source_type} ingestion" + ) + + service_type = get_service_type(source_type) + connection_class = get_connection_class(source_type, service_type) + + if source_type in HAS_INNER_CONNECTION: + # We will first parse the inner `connection` configuration + _parse_inner_connection(connection_dict, source_type) + + # Parse the service connection dictionary with the scoped class + _unsafe_parse_config( + config=connection_dict["connection"]["config"], + cls=connection_class, + message="Error parsing the service connection", + ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index cca8c00646e..5567f62a745 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -18,6 +18,8 @@ from typing import Callable, Optional, Union import airflow from airflow import DAG from openmetadata_managed_apis.api.utils import clean_dag_id +from pydantic import ValidationError +from requests.utils import quote from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.generated.schema.entity.services.dashboardService import DashboardService @@ -38,6 +40,11 @@ try: except ModuleNotFoundError: from airflow.operators.python_operator import PythonOperator +from openmetadata_managed_apis.utils.logger import workflow_logger +from openmetadata_managed_apis.utils.parser import ( + parse_service_connection, + parse_validation_err, +) from openmetadata_managed_apis.workflows.ingestion.credentials_builder import ( build_secrets_manager_credentials, ) @@ -54,7 +61,14 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig +from metadata.ingestion.api.parser import ( + InvalidWorkflowException, + ParsingConfigurationError, +) from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.utils import model_str + +logger = workflow_logger() class InvalidServiceException(Exception): @@ -115,25 +129,57 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: sourceConfig=ingestion_pipeline.sourceConfig, ) - if service_type == "databaseService": - service: DatabaseService = metadata.get_by_name( - entity=DatabaseService, fqn=ingestion_pipeline.service.name - ) - elif service_type == "pipelineService": - service: PipelineService = metadata.get_by_name( - entity=PipelineService, fqn=ingestion_pipeline.service.name - ) - elif service_type == "dashboardService": - service: DashboardService = metadata.get_by_name( - entity=DashboardService, fqn=ingestion_pipeline.service.name - ) - elif service_type == "messagingService": - service: MessagingService = metadata.get_by_name( - entity=MessagingService, fqn=ingestion_pipeline.service.name - ) - elif service_type == "mlmodelService": - service: MlModelService = metadata.get_by_name( - entity=MlModelService, fqn=ingestion_pipeline.service.name + entity_class = None + try: + if service_type == "databaseService": + entity_class = DatabaseService + service: DatabaseService = metadata.get_by_name( + entity=entity_class, fqn=ingestion_pipeline.service.name + ) + elif service_type == "pipelineService": + entity_class = PipelineService + service: PipelineService = metadata.get_by_name( + entity=entity_class, fqn=ingestion_pipeline.service.name + ) + elif service_type == "dashboardService": + entity_class = DashboardService + service: DashboardService = metadata.get_by_name( + entity=entity_class, fqn=ingestion_pipeline.service.name + ) + elif service_type == "messagingService": + entity_class = MessagingService + service: MessagingService = metadata.get_by_name( + entity=entity_class, fqn=ingestion_pipeline.service.name + ) + elif service_type == "mlmodelService": + entity_class = MlModelService + service: MlModelService = metadata.get_by_name( + entity=entity_class, fqn=ingestion_pipeline.service.name + ) + else: + raise InvalidServiceException(f"Invalid Service Type: {service_type}") + except ValidationError as original_error: + try: + resp = metadata.client.get( + f"{metadata.get_suffix(entity_class)}/name/{quote(model_str(ingestion_pipeline.service.name), safe='')}" + ) + parse_service_connection(resp) + except (ValidationError, InvalidWorkflowException) as scoped_error: + if isinstance(scoped_error, ValidationError): + # Let's catch validations of internal Workflow models, not the Workflow itself + object_error = ( + scoped_error.model.__name__ + if scoped_error.model is not None + else "workflow" + ) + raise ParsingConfigurationError( + f"We encountered an error parsing the configuration of your {object_error}.\n" + f"{parse_validation_err(scoped_error)}" + ) + raise scoped_error + raise ParsingConfigurationError( + f"We encountered an error parsing the configuration of your workflow.\n" + f"{parse_validation_err(original_error)}" ) if not service: