From 79444f4a24c85bcca60d7876b4e247dfd9d55b01 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Fri, 22 Dec 2023 09:56:39 +0530 Subject: [PATCH] Fix 13823: Validate and Parse gracefully IngestionPipeline (#14461) * fixed ingestion pipeline parsing * Added validation for automation workflow --------- Co-authored-by: Pere Miquel Brull --- .../src/metadata/ingestion/api/parser.py | 105 +++-- .../ometa/mixins/ingestion_pipeline_mixin.py | 5 +- ingestion/tests/unit/test_workflow_parse.py | 390 ++++++++++++++++-- .../api/routes/deploy.py | 8 +- .../api/routes/run_automation.py | 9 +- 5 files changed, 449 insertions(+), 68 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/parser.py b/ingestion/src/metadata/ingestion/api/parser.py index 50f2bc014ed..c05d4dbdb21 100644 --- a/ingestion/src/metadata/ingestion/api/parser.py +++ b/ingestion/src/metadata/ingestion/api/parser.py @@ -11,12 +11,12 @@ """ Helper to parse workflow configurations """ -from typing import Optional, Type, TypeVar, Union +from typing import Type, TypeVar, Union from pydantic import BaseModel, ValidationError -from metadata.generated.schema.entity.automations.testServiceConnection import ( - TestServiceConnectionRequest, +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, ) from metadata.generated.schema.entity.services.dashboardService import ( DashboardConnection, @@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, DatabaseServiceType, ) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, +) from metadata.generated.schema.entity.services.messagingService import ( MessagingConnection, MessagingServiceType, @@ -270,13 +273,11 @@ def _parse_inner_connection(config_dict: dict, source_type: str) -> None: :param config_dict: JSON configuration :param source_type: source type name, e.g., Airflow. """ - inner_source_type = config_dict["source"]["serviceConnection"]["config"][ - "connection" - ]["type"] + inner_source_type = config_dict["type"] inner_service_type = get_service_type(inner_source_type) inner_connection_class = get_connection_class(inner_source_type, inner_service_type) _unsafe_parse_config( - config=config_dict["source"]["serviceConnection"]["config"]["connection"], + config=config_dict, cls=inner_connection_class, message=f"Error parsing the inner service connection for {source_type}", ) @@ -303,7 +304,12 @@ def parse_service_connection(config_dict: dict) -> None: if source_type in HAS_INNER_CONNECTION: # We will first parse the inner `connection` configuration - _parse_inner_connection(config_dict, source_type) + _parse_inner_connection( + config_dict["source"]["serviceConnection"]["config"]["connection"][ + "config" + ]["connection"], + source_type, + ) # Parse the service connection dictionary with the scoped class _unsafe_parse_config( @@ -400,37 +406,84 @@ def parse_workflow_config_gracefully( raise ParsingConfigurationError("Uncaught error when parsing the workflow!") -def parse_test_connection_request_gracefully( +def parse_ingestion_pipeline_config_gracefully( config_dict: dict, -) -> Optional[TestServiceConnectionRequest]: +) -> IngestionPipeline: """ - This function either correctly parses the pydantic class, - or throws a scoped error while fetching the required source - connection class + This function either correctly parses the pydantic class, or + throws a scoped error while fetching the required source connection + class. - :param config_dict: JSON workflow config - :return: TestServiceConnectionRequest or scoped error + :param config_dict: JSON ingestion pipeline config + :return:Ingestion Pipeline config or scoped error """ try: - test_service_connection = TestServiceConnectionRequest.parse_obj(config_dict) - return test_service_connection + ingestion_pipeline = IngestionPipeline.parse_obj(config_dict) + return ingestion_pipeline - except ValidationError as err: - # Unsafe access to the keys. Allow a KeyError if the config is not well formatted - source_type = config_dict["connection"]["config"]["type"] - logger.warning( - f"Error parsing the Workflow Configuration for {source_type} ingestion: {err}" + except ValidationError: + source_config_type = config_dict["sourceConfig"]["config"].get("type") + + if source_config_type is None: + raise InvalidWorkflowException("Missing type in the sourceConfig config") + + source_config_class = get_source_config_class(source_config_type) + + _unsafe_parse_config( + config=config_dict["sourceConfig"]["config"], + cls=source_config_class, + message="Error parsing the source config", + ) + + raise ParsingConfigurationError( + "Uncaught error when parsing the Ingestion Pipeline!" + ) + + +def parse_automation_workflow_gracefully( + config_dict: dict, +) -> AutomationWorkflow: + """ + This function either correctly parses the pydantic class, or + throws a scoped error while fetching the required source connection + class. + + :param config_dict: JSON AutomationWorkflow config + :return: AutomationWorkflow config or scoped error + """ + + try: + automation_workflow = AutomationWorkflow.parse_obj(config_dict) + return automation_workflow + + except ValidationError: + source_type = config_dict["request"]["connection"]["config"].get("type") + + if source_type is None: + raise InvalidWorkflowException("Missing type in the connection config") + + logger.debug( + f"Error parsing the Workflow Configuration for {source_type} ingestion" ) service_type = get_service_type(source_type) connection_class = get_connection_class(source_type, service_type) - # Parse the dictionary with the scoped class + if source_type in HAS_INNER_CONNECTION: + # We will first parse the inner `connection` configuration + _parse_inner_connection( + config_dict["request"]["connection"]["config"]["connection"], + source_type, + ) + + # Parse the service connection dictionary with the scoped class _unsafe_parse_config( - config=config_dict["connection"]["config"], + config=config_dict["request"]["connection"]["config"], cls=connection_class, - message="Error parsing the connection config", + message="Error parsing the service connection", ) - raise ParsingConfigurationError("Uncaught error when parsing the workflow!") + raise ParsingConfigurationError( + "Uncaught error when parsing the Ingestion Pipeline!" + ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py index dd025194af9..8116e6014c8 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py @@ -20,6 +20,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel IngestionPipeline, PipelineStatus, ) +from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully from metadata.ingestion.ometa.client import REST from metadata.utils.logger import ometa_logger @@ -79,7 +80,7 @@ class OMetaIngestionPipelineMixin: f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}" ) - return IngestionPipeline.parse_obj(resp) + return parse_ingestion_pipeline_config_gracefully(resp) def get_pipeline_status_between_ts( self, @@ -125,6 +126,6 @@ class OMetaIngestionPipelineMixin: ) if hasattr(resp, "sourceConfig"): - return IngestionPipeline.parse_obj(resp) + return parse_ingestion_pipeline_config_gracefully(resp) return None diff --git a/ingestion/tests/unit/test_workflow_parse.py b/ingestion/tests/unit/test_workflow_parse.py index 2a142d37ac8..106097ecc12 100644 --- a/ingestion/tests/unit/test_workflow_parse.py +++ b/ingestion/tests/unit/test_workflow_parse.py @@ -16,8 +16,8 @@ from unittest import TestCase from pydantic import ValidationError -from metadata.generated.schema.entity.automations.testServiceConnection import ( - TestServiceConnectionRequest, +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, ) from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import ( TableauConnection, @@ -35,6 +35,9 @@ from metadata.generated.schema.entity.services.dashboardService import ( DashboardConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, +) from metadata.generated.schema.entity.services.messagingService import ( MessagingConnection, ) @@ -56,7 +59,8 @@ from metadata.ingestion.api.parser import ( get_connection_class, get_service_type, get_source_config_class, - parse_test_connection_request_gracefully, + parse_automation_workflow_gracefully, + parse_ingestion_pipeline_config_gracefully, parse_workflow_config_gracefully, ) @@ -312,61 +316,387 @@ class TestWorkflowParse(TestCase): str(err.exception), ) - def test_test_connection_mysql(self): + def test_parsing_ingestion_pipeline_mysql(self): """ - Test the TestConnection for MySQL + Test parsing of ingestion_pipeline for MYSQL """ config_dict = { - "connection": { + "id": "08868b3e-cd02-4257-a545-9080856371a0", + "name": "qwfef_metadata_SPWHTqVO", + "pipelineType": "metadata", + "sourceConfig": { "config": { - "type": "Mysql", - "username": "openmetadata_user", - "authType": {"password": "openmetadata_password"}, - "hostPort": "localhost:3306", + "type": "DatabaseMetadata", + "includeTags": True, + "includeViews": True, + "includeTables": True, + "queryLogDuration": 1, + "markDeletedTables": True, + "tableFilterPattern": {"excludes": [], "includes": []}, + "useFqnForFiltering": True, + "schemaFilterPattern": {"excludes": [], "includes": []}, + "databaseFilterPattern": {"excludes": [], "includes": []}, + "includeStoredProcedures": True, + "queryParsingTimeoutLimit": 300, + "markDeletedStoredProcedures": True, } }, - "connectionType": "Database", + "airflowConfig": { + "retries": 0, + "startDate": "2023-12-19T00:00:00.000000Z", + "retryDelay": 300, + "concurrency": 1, + "maxActiveRuns": 1, + "pausePipeline": False, + "pipelineCatchup": False, + "pipelineTimezone": "UTC", + "scheduleInterval": "0 * * * *", + "workflowDefaultView": "tree", + "workflowDefaultViewOrientation": "LR", + }, } self.assertIsInstance( - parse_test_connection_request_gracefully(config_dict), - TestServiceConnectionRequest, + parse_ingestion_pipeline_config_gracefully(config_dict), + IngestionPipeline, ) config_dict_ko = { - "connection": { + "id": "08868b3e-cd02-4257-a545-9080856371a0", + "name": "qwfef_metadata_SPWHTqVO", + "pipelineType": "metadata", + "sourceConfig": { "config": { - "type": "Mysql", - "username": "openmetadata_user", - "authType": {"password": "openmetadata_password"}, + "type": "DatabaseMetadata", + "includeTags": True, + "includeViews": True, + "includeTables": True, + "viewLogDuration": 1, + "markDeletedTables": True, + "tFilterPattern": {"excludes": [], "includes": []}, + "useFqnForFiltering": True, + "schemaFilterPattern": {"excludes": [], "includes": []}, + "databaseFilterPattern": {"excludes": [], "includes": []}, + "includeStoredProcedures": True, + "queryParsingTimeoutLimit": 300, + "markDeletedStoredProcedures": True, } }, - "connectionType": "Database", + "airflowConfig": { + "retries": 0, + "startDate": "2023-12-19T00:00:00.000000Z", + "retryDelay": 300, + "concurrency": 1, + "maxActiveRuns": 1, + "pausePipeline": False, + "pipelineCatchup": False, + "pipelineTimezone": "UTC", + "scheduleInterval": "0 * * * *", + "workflowDefaultView": "tree", + "workflowDefaultViewOrientation": "LR", + }, } with self.assertRaises(ValidationError) as err: - parse_test_connection_request_gracefully(config_dict_ko) + parse_ingestion_pipeline_config_gracefully(config_dict_ko) self.assertIn( - "1 validation error for MysqlConnection\nhostPort\n field required (type=value_error.missing)", + "2 validation errors for DatabaseServiceMetadataPipeline\ntFilterPattern\n extra fields not permitted (type=value_error.extra)\nviewLogDuration\n extra fields not permitted (type=value_error.extra)", str(err.exception), ) - config_dict_ko2 = { - "connection": { + def test_parsing_ingestion_pipeline_dagster(self): + """ + Test parsing of ingestion_pipeline for Dagster + """ + config_dict = { + "id": "da50179a-02c8-42d1-a8bd-3002a49649a6", + "name": "dagster_dev_metadata_G6pRkj7X", + "pipelineType": "metadata", + "sourceConfig": { "config": { - "type": "Mysql", - "username": "openmetadata_user", - "authType": {"password": "openmetadata_password"}, - "hostPort": "localhost:3306", - "random": "value", + "type": "PipelineMetadata", + "includeTags": True, + "includeOwners": True, + "dbServiceNames": ["dev"], + "includeLineage": True, + "markDeletedPipelines": True, + "pipelineFilterPattern": { + "excludes": [], + "includes": ["test_pipeline"], + }, } }, - "connectionType": "Database", + "airflowConfig": { + "retries": 0, + "startDate": "2023-12-19T00:00:00.000000Z", + "retryDelay": 300, + "concurrency": 1, + "maxActiveRuns": 1, + "pausePipeline": False, + "pipelineCatchup": False, + "pipelineTimezone": "UTC", + "scheduleInterval": "0 * * * *", + "workflowDefaultView": "tree", + "workflowDefaultViewOrientation": "LR", + }, + } + self.assertIsInstance( + parse_ingestion_pipeline_config_gracefully(config_dict), + IngestionPipeline, + ) + + config_dict_ko = { + "id": "da50179a-02c8-42d1-a8bd-3002a49649a6", + "name": "dagster_dev_metadata_G6pRkj7X", + "pipelineType": "metadata", + "sourceConfig": { + "config": { + "type": "PipelineMetadata", + "includeTags": True, + "includeOwners": True, + "dbServiceNames": ["dev"], + "includeViewLineage": True, + "markDeletedDbs": True, + "pipelineFilterPatterns": { + "excludes": [], + "includes": ["test_pipeline"], + }, + } + }, + "airflowConfig": { + "retries": 0, + "startDate": "2023-12-19T00:00:00.000000Z", + "retryDelay": 300, + "concurrency": 1, + "maxActiveRuns": 1, + "pausePipeline": False, + "pipelineCatchup": False, + "pipelineTimezone": "UTC", + "scheduleInterval": "0 * * * *", + "workflowDefaultView": "tree", + "workflowDefaultViewOrientation": "LR", + }, } with self.assertRaises(ValidationError) as err: - parse_test_connection_request_gracefully(config_dict_ko2) + parse_ingestion_pipeline_config_gracefully(config_dict_ko) self.assertIn( - "1 validation error for MysqlConnection\nrandom\n extra fields not permitted (type=value_error.extra)", + "3 validation errors for PipelineServiceMetadataPipeline\nincludeViewLineage\n extra fields not permitted (type=value_error.extra)\nmarkDeletedDbs\n extra fields not permitted (type=value_error.extra)\npipelineFilterPatterns\n extra fields not permitted (type=value_error.extra)", + str(err.exception), + ) + + def test_parsing_automation_workflow_airflow(self): + """ + Test parsing of automation workflow for airflow + """ + config_dict = { + "id": "8b735b2c-194e-41a4-b383-96253f936293", + "name": "test-connection-Airflow-WhCTUSXJ", + "deleted": False, + "request": { + "connection": { + "config": { + "type": "Airflow", + "hostPort": "https://localhost:8080", + "connection": { + "type": "Mysql", + "scheme": "mysql+pymysql", + "authType": {"password": "fernet:demo_password"}, + "hostPort": "mysql:3306", + "username": "admin@openmetadata.org", + "databaseName": "airflow_db", + "supportsProfiler": True, + "supportsQueryComment": True, + "supportsDBTExtraction": True, + "supportsMetadataExtraction": True, + }, + "numberOfStatus": 10, + "supportsMetadataExtraction": True, + } + }, + "serviceName": "airflow_test_two", + "serviceType": "Pipeline", + "connectionType": "Airflow", + "secretsManagerProvider": "db", + }, + "version": 0.1, + "updatedAt": 1703157653864, + "updatedBy": "admin", + "workflowType": "TEST_CONNECTION", + "fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ", + } + self.assertIsInstance( + parse_automation_workflow_gracefully(config_dict), + AutomationWorkflow, + ) + + config_dict_ko = { + "id": "8b735b2c-194e-41a4-b383-96253f936293", + "name": "test-connection-Airflow-WhCTUSXJ", + "deleted": False, + "request": { + "connection": { + "config": { + "type": "Airflow", + "hostPort": "localhost:8080", + "connection": { + "type": "Mysql", + "scheme": "mysql+pymysql", + "authType": {"password": "fernet:demo_password"}, + "hostPort": "mysql:3306", + "username": "admin@openmetadata.org", + "databaseName": "airflow_db", + "supportsProfiler": True, + "supportsQueryComment": True, + "supportsDBTExtraction": True, + "supportsMetadataExtraction": True, + }, + "numberOfStatus": 10, + "supportsMetadataExtraction": True, + } + }, + "serviceName": "airflow_test_two", + "serviceType": "Pipeline", + "connectionType": "Airflow", + "secretsManagerProvider": "db", + }, + "version": 0.1, + "updatedAt": 1703157653864, + "updatedBy": "admin", + "workflowType": "TEST_CONNECTION", + "fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ", + } + + with self.assertRaises(ValidationError) as err: + parse_automation_workflow_gracefully(config_dict_ko) + self.assertIn( + "1 validation error for AirflowConnection\nhostPort\n invalid or missing URL scheme (type=value_error.url.scheme)", + str(err.exception), + ) + + config_dict_ko_2 = { + "id": "8b735b2c-194e-41a4-b383-96253f936293", + "name": "test-connection-Airflow-WhCTUSXJ", + "deleted": False, + "request": { + "connection": { + "config": { + "type": "Airflow", + "hostPort": "https://localhost:8080", + "connection": { + "type": "Mysql", + "scheme": "mysql+pymysql", + "authType": {"password": "fernet:demo_password"}, + "hostPort": "mysql:3306", + "usernam": "admin@openmetadata.org", + "databaseName": "airflow_db", + "supportsProfile": True, + "supportsQueryComment": True, + "supportsDBTExtraction": True, + "supportsMetadataExtraction": True, + }, + "numberOfStatus": 10, + "supportsMetadataExtraction": True, + } + }, + "serviceName": "airflow_test_two", + "serviceType": "Pipeline", + "connectionType": "Airflow", + "secretsManagerProvider": "db", + }, + "version": 0.1, + "updatedAt": 1703157653864, + "updatedBy": "admin", + "workflowType": "TEST_CONNECTION", + "fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ", + } + + with self.assertRaises(ValidationError) as err: + parse_automation_workflow_gracefully(config_dict_ko_2) + self.assertIn( + "3 validation errors for MysqlConnection\nusername\n field required (type=value_error.missing)\nsupportsProfile\n extra fields not permitted (type=value_error.extra)\nusernam\n extra fields not permitted (type=value_error.extra)", + str(err.exception), + ) + + def test_parsing_automation_workflow_athena(self): + """ + Test parsing of automation workflow for airflow + """ + config_dict = { + "id": "850b194c-3d1b-4f6f-95df-83e3df5ccb24", + "name": "test-connection-Athena-EHnc3Ral", + "deleted": False, + "request": { + "connection": { + "config": { + "type": "Athena", + "scheme": "awsathena+rest", + "awsConfig": { + "awsRegion": "us-east-2", + "assumeRoleSessionName": "OpenMetadataSession", + }, + "workgroup": "primary", + "s3StagingDir": "s3://athena-postgres/output/", + "supportsProfiler": True, + "supportsQueryComment": True, + "supportsDBTExtraction": True, + "supportsUsageExtraction": True, + "supportsLineageExtraction": True, + "supportsMetadataExtraction": True, + } + }, + "serviceType": "Database", + "connectionType": "Athena", + "secretsManagerProvider": "db", + }, + "version": 0.1, + "updatedAt": 1703173676044, + "updatedBy": "admin", + "workflowType": "TEST_CONNECTION", + "fullyQualifiedName": "test-connection-Athena-EHnc3Ral", + } + self.assertIsInstance( + parse_automation_workflow_gracefully(config_dict), + AutomationWorkflow, + ) + + config_dict_ko = { + "id": "850b194c-3d1b-4f6f-95df-83e3df5ccb24", + "name": "test-connection-Athena-EHnc3Ral", + "deleted": False, + "request": { + "connection": { + "config": { + "type": "Athena", + "scheme": "awsathena+rest", + "awsConfig": { + "awsRegion": "us-east-2", + "assumeRoleSessionName": "OpenMetadataSession", + }, + "workgroup": "primary", + "s3StagingDir": "athena-postgres/output/", + "supportsProfiler": True, + "supportsQueryComment": True, + "supportsDBTExtraction": True, + "supportsUsageExtraction": True, + "supportsLineageExtraction": True, + "supportsMetadataExtraction": True, + } + }, + "serviceType": "Database", + "connectionType": "Athena", + "secretsManagerProvider": "db", + }, + "version": 0.1, + "updatedAt": 1703173676044, + "updatedBy": "admin", + "workflowType": "TEST_CONNECTION", + "fullyQualifiedName": "test-connection-Athena-EHnc3Ral", + } + + with self.assertRaises(ValidationError) as err: + parse_automation_workflow_gracefully(config_dict_ko) + self.assertIn( + "1 validation error for AthenaConnection\ns3StagingDir\n invalid or missing URL scheme (type=value_error.url.scheme)", str(err.exception), ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py index b6993bf110e..60138de7822 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py @@ -20,9 +20,7 @@ from openmetadata_managed_apis.operations.deploy import DagDeployer from openmetadata_managed_apis.utils.logger import routes_logger from pydantic import ValidationError -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - IngestionPipeline, -) +from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully logger = routes_logger() @@ -61,7 +59,9 @@ def get_fn(blueprint: Blueprint) -> Callable: error=f"Did not receive any JSON request to deploy", ) - ingestion_pipeline = IngestionPipeline.parse_obj(json_request) + ingestion_pipeline = parse_ingestion_pipeline_config_gracefully( + json_request + ) deployer = DagDeployer(ingestion_pipeline) response = deployer.deploy() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py index 03eb6485721..ab79b75fbfa 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/run_automation.py @@ -20,9 +20,7 @@ from openmetadata_managed_apis.utils.logger import routes_logger from pydantic import ValidationError from metadata.automations.runner import execute -from metadata.generated.schema.entity.automations.workflow import ( - Workflow as AutomationWorkflow, -) +from metadata.ingestion.api.parser import parse_automation_workflow_gracefully from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory logger = routes_logger() @@ -53,9 +51,8 @@ def get_fn(blueprint: Blueprint) -> Callable: json_request = request.get_json(cache=False) try: - # TODO: Prepare `parse_automation_workflow_gracefully` - automation_workflow: AutomationWorkflow = AutomationWorkflow.parse_obj( - json_request + automation_workflow = parse_automation_workflow_gracefully( + config_dict=json_request ) # we need to instantiate the secret manager in case secrets are passed