diff --git a/ingestion/src/metadata/ingestion/connections/test_connections.py b/ingestion/src/metadata/ingestion/connections/test_connections.py index 005605ad2da..e98b67dbe32 100644 --- a/ingestion/src/metadata/ingestion/connections/test_connections.py +++ b/ingestion/src/metadata/ingestion/connections/test_connections.py @@ -38,6 +38,7 @@ from metadata.generated.schema.entity.services.connections.testConnectionResult ) from metadata.generated.schema.type.basic import Timestamp from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import kill_active_connections from metadata.profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.logger import cli_logger from metadata.utils.timeout import timeout @@ -329,6 +330,8 @@ def test_connection_db_common( timeout_seconds=timeout_seconds, ) + kill_active_connections(engine) + def test_connection_db_schema_sources( metadata: OpenMetadata, @@ -392,6 +395,8 @@ def test_connection_db_schema_sources( automation_workflow=automation_workflow, ) + kill_active_connections(engine) + def test_query(engine: Engine, statement: str): """ diff --git a/ingestion/src/metadata/ingestion/source/connections.py b/ingestion/src/metadata/ingestion/source/connections.py index 4e34fb75826..d7f58234adc 100644 --- a/ingestion/src/metadata/ingestion/source/connections.py +++ b/ingestion/src/metadata/ingestion/source/connections.py @@ -13,11 +13,16 @@ Main entrypoints to create and test connections for any source. """ +import traceback from typing import Any, Callable from pydantic import BaseModel +from sqlalchemy.engine import Engine from metadata.utils.importer import import_connection_fn +from metadata.utils.logger import cli_logger + +logger = cli_logger() GET_CONNECTION_FN_NAME = "get_connection" TEST_CONNECTION_FN_NAME = "test_connection" @@ -47,3 +52,17 @@ def get_connection(connection: BaseModel) -> Any: a service connection pydantic model """ return get_connection_fn(connection)(connection) + + +def kill_active_connections(engine: Engine): + """ + Method to kill the active connections + as well as idle connections in the engine + """ + try: + active_conn = engine.pool.checkedout() + engine.pool.checkedin() + if active_conn: + engine.dispose() + except Exception as exc: + logger.warning(f"Error Killing the active connections {exc}") + logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index bff73b21d90..238168dcff8 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -61,7 +61,10 @@ from metadata.ingestion.connections.session import create_and_bind_thread_safe_s from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.connections import get_connection +from metadata.ingestion.source.connections import ( + get_connection, + kill_active_connections, +) from metadata.ingestion.source.database.database_service import DatabaseServiceSource from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource @@ -142,6 +145,8 @@ class CommonDbSourceService( to setup multiple inspectors. They can use this function. :param database_name: new database to set """ + + kill_active_connections(self.engine) logger.info(f"Ingesting from database: {database_name}") new_service_connection = deepcopy(self.service_connection)