From 1a67e4fb7ddbad23bd6e65edf5230b506980621f Mon Sep 17 00:00:00 2001 From: Keshav Mohta <68001229+keshavmohta09@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:59:39 +0530 Subject: [PATCH] Feature: MariaDB Stored Procedures and Functions Support #23422 --- .../source/database/mariadb/metadata.py | 87 +++++++++- .../source/database/mariadb/models.py | 36 ++++ .../source/database/mariadb/queries.py | 43 +++++ .../unit/topology/database/test_mariadb.py | 157 ++++++++++++++++++ 4 files changed, 322 insertions(+), 1 deletion(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/mariadb/models.py create mode 100644 ingestion/src/metadata/ingestion/source/database/mariadb/queries.py create mode 100644 ingestion/tests/unit/topology/database/test_mariadb.py diff --git a/ingestion/src/metadata/ingestion/source/database/mariadb/metadata.py b/ingestion/src/metadata/ingestion/source/database/mariadb/metadata.py index c4c096b576a..23992f93313 100644 --- a/ingestion/src/metadata/ingestion/source/database/mariadb/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mariadb/metadata.py @@ -11,21 +11,44 @@ """ MariaDB source module """ -from typing import Optional +import traceback +from typing import Iterable, Optional from sqlalchemy.dialects.mysql.base import ischema_names from sqlalchemy.dialects.mysql.reflection import MySQLTableDefinitionParser +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( MariaDBConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.basic import EntityName, Markdown +from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.mariadb.models import ( + ROUTINE_TYPE_MAP, + MariaDBStoredProcedure, +) +from metadata.ingestion.source.database.mariadb.queries import ( + MARIADB_GET_FUNCTIONS, + MARIADB_GET_STORED_PROCEDURES, +) from metadata.ingestion.source.database.mysql.utils import col_type_map, parse_column +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() ischema_names.update(col_type_map) @@ -52,3 +75,65 @@ class MariadbSource(CommonDbSourceService): f"Expected MariaDBConnection, but got {connection}" ) return cls(config, metadata) + + def _get_stored_procedures_internal( + self, query: str + ) -> Iterable[MariaDBStoredProcedure]: + results = self.engine.execute(query).all() + for row in results: + try: + yield MariaDBStoredProcedure.model_validate(dict(row._mapping)) + except Exception as exc: + error = f"Error parsing Stored Procedure payload: {exc}" + logger.error(error) + self.status.failed( + error=StackTraceError( + name=dict(row).get("procedure_name", "UNKNOWN"), + error=error, + stackTrace=traceback.format_exc(), + ) + ) + + def get_stored_procedures(self) -> Iterable[MariaDBStoredProcedure]: + """List stored procedures""" + if self.source_config.includeStoredProcedures: + for query in (MARIADB_GET_STORED_PROCEDURES, MARIADB_GET_FUNCTIONS): + yield from self._get_stored_procedures_internal( + query.format(schema_name=self.context.get().database_schema) + ) + + def yield_stored_procedure( + self, stored_procedure + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Prepare the stored procedure payload""" + try: + stored_procedure_request = CreateStoredProcedureRequest( + name=EntityName(stored_procedure.name), + description=( + Markdown(stored_procedure.description) + if stored_procedure.description + else None + ), + storedProcedureCode=StoredProcedureCode( + language=stored_procedure.language, code=stored_procedure.definition + ), + databaseSchema=fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=self.context.get().database_schema, + ), + storedProcedureType=ROUTINE_TYPE_MAP[stored_procedure.procedure_type], + ) + yield Either(right=stored_procedure_request) + self.register_record_stored_proc_request(stored_procedure_request) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=stored_procedure.name, + error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/mariadb/models.py b/ingestion/src/metadata/ingestion/source/database/mariadb/models.py new file mode 100644 index 00000000000..25e244d1b6d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/mariadb/models.py @@ -0,0 +1,36 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +MariaDB models +""" +from typing import Optional + +from pydantic import BaseModel, Field + +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureType + +ROUTINE_TYPE_MAP = { + "PROCEDURE": StoredProcedureType.StoredProcedure, + "FUNCTION": StoredProcedureType.Function, +} + + +class MariaDBStoredProcedure(BaseModel): + """ + MariaDB stored procedure list query results + """ + + name: str = Field(alias="procedure_name") + schema_name: str + definition: str + language: Optional[str] + procedure_type: Optional[str] + description: Optional[str] diff --git a/ingestion/src/metadata/ingestion/source/database/mariadb/queries.py b/ingestion/src/metadata/ingestion/source/database/mariadb/queries.py new file mode 100644 index 00000000000..dba39b357bf --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/mariadb/queries.py @@ -0,0 +1,43 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQL Queries used during MariaDB ingestion +""" + +MARIADB_GET_STORED_PROCEDURES = """ +SELECT + ROUTINE_NAME AS procedure_name, + ROUTINE_SCHEMA AS schema_name, + ROUTINE_DEFINITION AS definition, + ROUTINE_TYPE AS procedure_type, + ROUTINE_COMMENT AS description, + EXTERNAL_LANGUAGE AS language +FROM + information_schema.ROUTINES +WHERE + ROUTINE_SCHEMA = '{schema_name}' + AND ROUTINE_TYPE = 'PROCEDURE' +""" + +MARIADB_GET_FUNCTIONS = """ +SELECT + ROUTINE_NAME AS procedure_name, + ROUTINE_SCHEMA AS schema_name, + ROUTINE_DEFINITION AS definition, + ROUTINE_TYPE AS procedure_type, + ROUTINE_COMMENT AS description, + EXTERNAL_LANGUAGE AS language +FROM + information_schema.ROUTINES +WHERE + ROUTINE_SCHEMA = '{schema_name}' + AND ROUTINE_TYPE = 'FUNCTION' +""" diff --git a/ingestion/tests/unit/topology/database/test_mariadb.py b/ingestion/tests/unit/topology/database/test_mariadb.py new file mode 100644 index 00000000000..89db75eed4e --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_mariadb.py @@ -0,0 +1,157 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test MariaDB using the topology +""" + +import types +from unittest import TestCase +from unittest.mock import Mock, patch + +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureType +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import EntityName, Markdown +from metadata.ingestion.api.models import Either +from metadata.ingestion.source.database.mariadb.metadata import MariadbSource +from metadata.ingestion.source.database.mariadb.models import MariaDBStoredProcedure + +mock_mariadb_config = { + "source": { + "type": "mariadb", + "serviceName": "local_mariadb", + "serviceConnection": { + "config": { + "username": "root", + "hostPort": "localhost:3306", + "authType": {"password": "test"}, + } + }, + "sourceConfig": { + "config": { + "type": "DatabaseMetadata", + "includeStoredProcedures": True, + } + }, + }, + "sink": { + "type": "metadata-rest", + "config": {}, + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "mariadb"}, + } + }, +} + + +class MariaDBUnitTest(TestCase): + @patch( + "metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection" + ) + @patch("metadata.ingestion.source.database.mariadb.metadata.MariadbSource.__init__") + def __init__(self, methodName, mock_init, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + mock_init.return_value = None + + self.config = OpenMetadataWorkflowConfig.model_validate(mock_mariadb_config) + self.mariadb_source = MariadbSource.__new__(MariadbSource) + self.mariadb_source.source_config = self.config.source.sourceConfig.config + self.mariadb_source.metadata = Mock() + self.mariadb_source.status = Mock() + self.mariadb_source.engine = Mock() + self.mariadb_source.context = Mock() + self.mariadb_source.context.get = lambda: types.SimpleNamespace( + database_service="test_service", + database="test_db", + database_schema="test_schema", + ) + self.mariadb_source.register_record_stored_proc_request = Mock() + self.mariadb_source._connection = Mock() + self.mariadb_source._connection_map = {} + + def test_get_stored_procedures(self): + """Test getting both stored procedures and functions""" + mock_proc_row = Mock() + mock_proc_row._mapping = { + "procedure_name": "test_procedure", + "schema_name": "test_schema", + "definition": "BEGIN SELECT 1; END", + "language": "SQL", + "procedure_type": "PROCEDURE", + "description": "Test procedure", + } + + mock_func_row = Mock() + mock_func_row._mapping = { + "procedure_name": "test_function", + "schema_name": "test_schema", + "definition": "RETURN 1", + "language": "SQL", + "procedure_type": "FUNCTION", + "description": "Test function", + } + + with patch.object(self.mariadb_source, "engine") as mock_engine: + mock_results = [Mock(), Mock()] + mock_results[0].all.return_value = [mock_proc_row] + mock_results[1].all.return_value = [mock_func_row] + mock_engine.execute.side_effect = mock_results + + procedures = list(self.mariadb_source.get_stored_procedures()) + + self.assertEqual(len(procedures), 2) + self.assertEqual(procedures[0].name, "test_procedure") + self.assertEqual(procedures[0].procedure_type, "PROCEDURE") + self.assertEqual(procedures[1].name, "test_function") + self.assertEqual(procedures[1].procedure_type, "FUNCTION") + + def test_yield_stored_procedure(self): + """Test yielding stored procedure requests""" + stored_proc = MariaDBStoredProcedure( + procedure_name="test_procedure", + schema_name="test_schema", + definition="BEGIN SELECT 1; END", + language="SQL", + procedure_type="PROCEDURE", + description="Test procedure", + ) + + with ( + patch.object(self.mariadb_source, "metadata") as mock_metadata, + patch.object( + self.mariadb_source, "register_record_stored_proc_request" + ) as mock_register, + ): + results = list(self.mariadb_source.yield_stored_procedure(stored_proc)) + + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], Either) + self.assertIsNotNone(results[0].right) + self.assertIsInstance(results[0].right, CreateStoredProcedureRequest) + + request = results[0].right + self.assertEqual(request.name, EntityName("test_procedure")) + self.assertEqual(request.description, Markdown("Test procedure")) + self.assertEqual(request.storedProcedureCode.code, "BEGIN SELECT 1; END") + self.assertEqual( + request.storedProcedureType, StoredProcedureType.StoredProcedure + ) + mock_register.assert_called_once_with(request)