Fix 13823: Validate and Parse gracefully IngestionPipeline (#14461)

* fixed ingestion pipeline parsing

* Added validation for automation workflow

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Onkar Ravgan 2023-12-22 09:56:39 +05:30 committed by GitHub
parent 0a3b03bdc3
commit 79444f4a24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 449 additions and 68 deletions

View File

@ -11,12 +11,12 @@
""" """
Helper to parse workflow configurations Helper to parse workflow configurations
""" """
from typing import Optional, Type, TypeVar, Union from typing import Type, TypeVar, Union
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, ValidationError
from metadata.generated.schema.entity.automations.testServiceConnection import ( from metadata.generated.schema.entity.automations.workflow import (
TestServiceConnectionRequest, Workflow as AutomationWorkflow,
) )
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection, DashboardConnection,
@ -26,6 +26,9 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection, DatabaseConnection,
DatabaseServiceType, DatabaseServiceType,
) )
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.messagingService import ( from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection, MessagingConnection,
MessagingServiceType, MessagingServiceType,
@ -270,13 +273,11 @@ def _parse_inner_connection(config_dict: dict, source_type: str) -> None:
:param config_dict: JSON configuration :param config_dict: JSON configuration
:param source_type: source type name, e.g., Airflow. :param source_type: source type name, e.g., Airflow.
""" """
inner_source_type = config_dict["source"]["serviceConnection"]["config"][ inner_source_type = config_dict["type"]
"connection"
]["type"]
inner_service_type = get_service_type(inner_source_type) inner_service_type = get_service_type(inner_source_type)
inner_connection_class = get_connection_class(inner_source_type, inner_service_type) inner_connection_class = get_connection_class(inner_source_type, inner_service_type)
_unsafe_parse_config( _unsafe_parse_config(
config=config_dict["source"]["serviceConnection"]["config"]["connection"], config=config_dict,
cls=inner_connection_class, cls=inner_connection_class,
message=f"Error parsing the inner service connection for {source_type}", message=f"Error parsing the inner service connection for {source_type}",
) )
@ -303,7 +304,12 @@ def parse_service_connection(config_dict: dict) -> None:
if source_type in HAS_INNER_CONNECTION: if source_type in HAS_INNER_CONNECTION:
# We will first parse the inner `connection` configuration # We will first parse the inner `connection` configuration
_parse_inner_connection(config_dict, source_type) _parse_inner_connection(
config_dict["source"]["serviceConnection"]["config"]["connection"][
"config"
]["connection"],
source_type,
)
# Parse the service connection dictionary with the scoped class # Parse the service connection dictionary with the scoped class
_unsafe_parse_config( _unsafe_parse_config(
@ -400,37 +406,84 @@ def parse_workflow_config_gracefully(
raise ParsingConfigurationError("Uncaught error when parsing the workflow!") raise ParsingConfigurationError("Uncaught error when parsing the workflow!")
def parse_test_connection_request_gracefully( def parse_ingestion_pipeline_config_gracefully(
config_dict: dict, config_dict: dict,
) -> Optional[TestServiceConnectionRequest]: ) -> IngestionPipeline:
""" """
This function either correctly parses the pydantic class, This function either correctly parses the pydantic class, or
or throws a scoped error while fetching the required source throws a scoped error while fetching the required source connection
connection class class.
:param config_dict: JSON workflow config :param config_dict: JSON ingestion pipeline config
:return: TestServiceConnectionRequest or scoped error :return:Ingestion Pipeline config or scoped error
""" """
try: try:
test_service_connection = TestServiceConnectionRequest.parse_obj(config_dict) ingestion_pipeline = IngestionPipeline.parse_obj(config_dict)
return test_service_connection return ingestion_pipeline
except ValidationError as err: except ValidationError:
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted source_config_type = config_dict["sourceConfig"]["config"].get("type")
source_type = config_dict["connection"]["config"]["type"]
logger.warning( if source_config_type is None:
f"Error parsing the Workflow Configuration for {source_type} ingestion: {err}" raise InvalidWorkflowException("Missing type in the sourceConfig config")
source_config_class = get_source_config_class(source_config_type)
_unsafe_parse_config(
config=config_dict["sourceConfig"]["config"],
cls=source_config_class,
message="Error parsing the source config",
)
raise ParsingConfigurationError(
"Uncaught error when parsing the Ingestion Pipeline!"
)
def parse_automation_workflow_gracefully(
config_dict: dict,
) -> AutomationWorkflow:
"""
This function either correctly parses the pydantic class, or
throws a scoped error while fetching the required source connection
class.
:param config_dict: JSON AutomationWorkflow config
:return: AutomationWorkflow config or scoped error
"""
try:
automation_workflow = AutomationWorkflow.parse_obj(config_dict)
return automation_workflow
except ValidationError:
source_type = config_dict["request"]["connection"]["config"].get("type")
if source_type is None:
raise InvalidWorkflowException("Missing type in the connection config")
logger.debug(
f"Error parsing the Workflow Configuration for {source_type} ingestion"
) )
service_type = get_service_type(source_type) service_type = get_service_type(source_type)
connection_class = get_connection_class(source_type, service_type) connection_class = get_connection_class(source_type, service_type)
# Parse the dictionary with the scoped class if source_type in HAS_INNER_CONNECTION:
_unsafe_parse_config( # We will first parse the inner `connection` configuration
config=config_dict["connection"]["config"], _parse_inner_connection(
cls=connection_class, config_dict["request"]["connection"]["config"]["connection"],
message="Error parsing the connection config", source_type,
) )
raise ParsingConfigurationError("Uncaught error when parsing the workflow!") # Parse the service connection dictionary with the scoped class
_unsafe_parse_config(
config=config_dict["request"]["connection"]["config"],
cls=connection_class,
message="Error parsing the service connection",
)
raise ParsingConfigurationError(
"Uncaught error when parsing the Ingestion Pipeline!"
)

View File

@ -20,6 +20,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
IngestionPipeline, IngestionPipeline,
PipelineStatus, PipelineStatus,
) )
from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully
from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.client import REST
from metadata.utils.logger import ometa_logger from metadata.utils.logger import ometa_logger
@ -79,7 +80,7 @@ class OMetaIngestionPipelineMixin:
f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}" f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}"
) )
return IngestionPipeline.parse_obj(resp) return parse_ingestion_pipeline_config_gracefully(resp)
def get_pipeline_status_between_ts( def get_pipeline_status_between_ts(
self, self,
@ -125,6 +126,6 @@ class OMetaIngestionPipelineMixin:
) )
if hasattr(resp, "sourceConfig"): if hasattr(resp, "sourceConfig"):
return IngestionPipeline.parse_obj(resp) return parse_ingestion_pipeline_config_gracefully(resp)
return None return None

View File

@ -16,8 +16,8 @@ from unittest import TestCase
from pydantic import ValidationError from pydantic import ValidationError
from metadata.generated.schema.entity.automations.testServiceConnection import ( from metadata.generated.schema.entity.automations.workflow import (
TestServiceConnectionRequest, Workflow as AutomationWorkflow,
) )
from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import ( from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import (
TableauConnection, TableauConnection,
@ -35,6 +35,9 @@ from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection, DashboardConnection,
) )
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.messagingService import ( from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection, MessagingConnection,
) )
@ -56,7 +59,8 @@ from metadata.ingestion.api.parser import (
get_connection_class, get_connection_class,
get_service_type, get_service_type,
get_source_config_class, get_source_config_class,
parse_test_connection_request_gracefully, parse_automation_workflow_gracefully,
parse_ingestion_pipeline_config_gracefully,
parse_workflow_config_gracefully, parse_workflow_config_gracefully,
) )
@ -312,61 +316,387 @@ class TestWorkflowParse(TestCase):
str(err.exception), str(err.exception),
) )
def test_test_connection_mysql(self): def test_parsing_ingestion_pipeline_mysql(self):
""" """
Test the TestConnection for MySQL Test parsing of ingestion_pipeline for MYSQL
""" """
config_dict = { config_dict = {
"connection": { "id": "08868b3e-cd02-4257-a545-9080856371a0",
"name": "qwfef_metadata_SPWHTqVO",
"pipelineType": "metadata",
"sourceConfig": {
"config": { "config": {
"type": "Mysql", "type": "DatabaseMetadata",
"username": "openmetadata_user", "includeTags": True,
"authType": {"password": "openmetadata_password"}, "includeViews": True,
"hostPort": "localhost:3306", "includeTables": True,
"queryLogDuration": 1,
"markDeletedTables": True,
"tableFilterPattern": {"excludes": [], "includes": []},
"useFqnForFiltering": True,
"schemaFilterPattern": {"excludes": [], "includes": []},
"databaseFilterPattern": {"excludes": [], "includes": []},
"includeStoredProcedures": True,
"queryParsingTimeoutLimit": 300,
"markDeletedStoredProcedures": True,
} }
}, },
"connectionType": "Database", "airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
} }
self.assertIsInstance( self.assertIsInstance(
parse_test_connection_request_gracefully(config_dict), parse_ingestion_pipeline_config_gracefully(config_dict),
TestServiceConnectionRequest, IngestionPipeline,
) )
config_dict_ko = { config_dict_ko = {
"connection": { "id": "08868b3e-cd02-4257-a545-9080856371a0",
"name": "qwfef_metadata_SPWHTqVO",
"pipelineType": "metadata",
"sourceConfig": {
"config": { "config": {
"type": "Mysql", "type": "DatabaseMetadata",
"username": "openmetadata_user", "includeTags": True,
"authType": {"password": "openmetadata_password"}, "includeViews": True,
"includeTables": True,
"viewLogDuration": 1,
"markDeletedTables": True,
"tFilterPattern": {"excludes": [], "includes": []},
"useFqnForFiltering": True,
"schemaFilterPattern": {"excludes": [], "includes": []},
"databaseFilterPattern": {"excludes": [], "includes": []},
"includeStoredProcedures": True,
"queryParsingTimeoutLimit": 300,
"markDeletedStoredProcedures": True,
} }
}, },
"connectionType": "Database", "airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
} }
with self.assertRaises(ValidationError) as err: with self.assertRaises(ValidationError) as err:
parse_test_connection_request_gracefully(config_dict_ko) parse_ingestion_pipeline_config_gracefully(config_dict_ko)
self.assertIn( self.assertIn(
"1 validation error for MysqlConnection\nhostPort\n field required (type=value_error.missing)", "2 validation errors for DatabaseServiceMetadataPipeline\ntFilterPattern\n extra fields not permitted (type=value_error.extra)\nviewLogDuration\n extra fields not permitted (type=value_error.extra)",
str(err.exception), str(err.exception),
) )
config_dict_ko2 = { def test_parsing_ingestion_pipeline_dagster(self):
"connection": { """
Test parsing of ingestion_pipeline for Dagster
"""
config_dict = {
"id": "da50179a-02c8-42d1-a8bd-3002a49649a6",
"name": "dagster_dev_metadata_G6pRkj7X",
"pipelineType": "metadata",
"sourceConfig": {
"config": { "config": {
"type": "Mysql", "type": "PipelineMetadata",
"username": "openmetadata_user", "includeTags": True,
"authType": {"password": "openmetadata_password"}, "includeOwners": True,
"hostPort": "localhost:3306", "dbServiceNames": ["dev"],
"random": "value", "includeLineage": True,
"markDeletedPipelines": True,
"pipelineFilterPattern": {
"excludes": [],
"includes": ["test_pipeline"],
},
} }
}, },
"connectionType": "Database", "airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
}
self.assertIsInstance(
parse_ingestion_pipeline_config_gracefully(config_dict),
IngestionPipeline,
)
config_dict_ko = {
"id": "da50179a-02c8-42d1-a8bd-3002a49649a6",
"name": "dagster_dev_metadata_G6pRkj7X",
"pipelineType": "metadata",
"sourceConfig": {
"config": {
"type": "PipelineMetadata",
"includeTags": True,
"includeOwners": True,
"dbServiceNames": ["dev"],
"includeViewLineage": True,
"markDeletedDbs": True,
"pipelineFilterPatterns": {
"excludes": [],
"includes": ["test_pipeline"],
},
}
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
} }
with self.assertRaises(ValidationError) as err: with self.assertRaises(ValidationError) as err:
parse_test_connection_request_gracefully(config_dict_ko2) parse_ingestion_pipeline_config_gracefully(config_dict_ko)
self.assertIn( self.assertIn(
"1 validation error for MysqlConnection\nrandom\n extra fields not permitted (type=value_error.extra)", "3 validation errors for PipelineServiceMetadataPipeline\nincludeViewLineage\n extra fields not permitted (type=value_error.extra)\nmarkDeletedDbs\n extra fields not permitted (type=value_error.extra)\npipelineFilterPatterns\n extra fields not permitted (type=value_error.extra)",
str(err.exception),
)
def test_parsing_automation_workflow_airflow(self):
"""
Test parsing of automation workflow for airflow
"""
config_dict = {
"id": "8b735b2c-194e-41a4-b383-96253f936293",
"name": "test-connection-Airflow-WhCTUSXJ",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Airflow",
"hostPort": "https://localhost:8080",
"connection": {
"type": "Mysql",
"scheme": "mysql+pymysql",
"authType": {"password": "fernet:demo_password"},
"hostPort": "mysql:3306",
"username": "admin@openmetadata.org",
"databaseName": "airflow_db",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsMetadataExtraction": True,
},
"numberOfStatus": 10,
"supportsMetadataExtraction": True,
}
},
"serviceName": "airflow_test_two",
"serviceType": "Pipeline",
"connectionType": "Airflow",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703157653864,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ",
}
self.assertIsInstance(
parse_automation_workflow_gracefully(config_dict),
AutomationWorkflow,
)
config_dict_ko = {
"id": "8b735b2c-194e-41a4-b383-96253f936293",
"name": "test-connection-Airflow-WhCTUSXJ",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Airflow",
"hostPort": "localhost:8080",
"connection": {
"type": "Mysql",
"scheme": "mysql+pymysql",
"authType": {"password": "fernet:demo_password"},
"hostPort": "mysql:3306",
"username": "admin@openmetadata.org",
"databaseName": "airflow_db",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsMetadataExtraction": True,
},
"numberOfStatus": 10,
"supportsMetadataExtraction": True,
}
},
"serviceName": "airflow_test_two",
"serviceType": "Pipeline",
"connectionType": "Airflow",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703157653864,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ",
}
with self.assertRaises(ValidationError) as err:
parse_automation_workflow_gracefully(config_dict_ko)
self.assertIn(
"1 validation error for AirflowConnection\nhostPort\n invalid or missing URL scheme (type=value_error.url.scheme)",
str(err.exception),
)
config_dict_ko_2 = {
"id": "8b735b2c-194e-41a4-b383-96253f936293",
"name": "test-connection-Airflow-WhCTUSXJ",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Airflow",
"hostPort": "https://localhost:8080",
"connection": {
"type": "Mysql",
"scheme": "mysql+pymysql",
"authType": {"password": "fernet:demo_password"},
"hostPort": "mysql:3306",
"usernam": "admin@openmetadata.org",
"databaseName": "airflow_db",
"supportsProfile": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsMetadataExtraction": True,
},
"numberOfStatus": 10,
"supportsMetadataExtraction": True,
}
},
"serviceName": "airflow_test_two",
"serviceType": "Pipeline",
"connectionType": "Airflow",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703157653864,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ",
}
with self.assertRaises(ValidationError) as err:
parse_automation_workflow_gracefully(config_dict_ko_2)
self.assertIn(
"3 validation errors for MysqlConnection\nusername\n field required (type=value_error.missing)\nsupportsProfile\n extra fields not permitted (type=value_error.extra)\nusernam\n extra fields not permitted (type=value_error.extra)",
str(err.exception),
)
def test_parsing_automation_workflow_athena(self):
"""
Test parsing of automation workflow for airflow
"""
config_dict = {
"id": "850b194c-3d1b-4f6f-95df-83e3df5ccb24",
"name": "test-connection-Athena-EHnc3Ral",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Athena",
"scheme": "awsathena+rest",
"awsConfig": {
"awsRegion": "us-east-2",
"assumeRoleSessionName": "OpenMetadataSession",
},
"workgroup": "primary",
"s3StagingDir": "s3://athena-postgres/output/",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsUsageExtraction": True,
"supportsLineageExtraction": True,
"supportsMetadataExtraction": True,
}
},
"serviceType": "Database",
"connectionType": "Athena",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703173676044,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Athena-EHnc3Ral",
}
self.assertIsInstance(
parse_automation_workflow_gracefully(config_dict),
AutomationWorkflow,
)
config_dict_ko = {
"id": "850b194c-3d1b-4f6f-95df-83e3df5ccb24",
"name": "test-connection-Athena-EHnc3Ral",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Athena",
"scheme": "awsathena+rest",
"awsConfig": {
"awsRegion": "us-east-2",
"assumeRoleSessionName": "OpenMetadataSession",
},
"workgroup": "primary",
"s3StagingDir": "athena-postgres/output/",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsUsageExtraction": True,
"supportsLineageExtraction": True,
"supportsMetadataExtraction": True,
}
},
"serviceType": "Database",
"connectionType": "Athena",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703173676044,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Athena-EHnc3Ral",
}
with self.assertRaises(ValidationError) as err:
parse_automation_workflow_gracefully(config_dict_ko)
self.assertIn(
"1 validation error for AthenaConnection\ns3StagingDir\n invalid or missing URL scheme (type=value_error.url.scheme)",
str(err.exception), str(err.exception),
) )

View File

@ -20,9 +20,7 @@ from openmetadata_managed_apis.operations.deploy import DagDeployer
from openmetadata_managed_apis.utils.logger import routes_logger from openmetadata_managed_apis.utils.logger import routes_logger
from pydantic import ValidationError from pydantic import ValidationError
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully
IngestionPipeline,
)
logger = routes_logger() logger = routes_logger()
@ -61,7 +59,9 @@ def get_fn(blueprint: Blueprint) -> Callable:
error=f"Did not receive any JSON request to deploy", error=f"Did not receive any JSON request to deploy",
) )
ingestion_pipeline = IngestionPipeline.parse_obj(json_request) ingestion_pipeline = parse_ingestion_pipeline_config_gracefully(
json_request
)
deployer = DagDeployer(ingestion_pipeline) deployer = DagDeployer(ingestion_pipeline)
response = deployer.deploy() response = deployer.deploy()

View File

@ -20,9 +20,7 @@ from openmetadata_managed_apis.utils.logger import routes_logger
from pydantic import ValidationError from pydantic import ValidationError
from metadata.automations.runner import execute from metadata.automations.runner import execute
from metadata.generated.schema.entity.automations.workflow import ( from metadata.ingestion.api.parser import parse_automation_workflow_gracefully
Workflow as AutomationWorkflow,
)
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
logger = routes_logger() logger = routes_logger()
@ -53,9 +51,8 @@ def get_fn(blueprint: Blueprint) -> Callable:
json_request = request.get_json(cache=False) json_request = request.get_json(cache=False)
try: try:
# TODO: Prepare `parse_automation_workflow_gracefully` automation_workflow = parse_automation_workflow_gracefully(
automation_workflow: AutomationWorkflow = AutomationWorkflow.parse_obj( config_dict=json_request
json_request
) )
# we need to instantiate the secret manager in case secrets are passed # we need to instantiate the secret manager in case secrets are passed