Minor: Kill active/idle connections after test connections (#17411)

* Minor: Kill active/idle connections after test connections

* fixed idle conn for multi db

* added exception handling
This commit is contained in:
Suman Maharana 2024-08-14 19:12:42 +05:30 committed by GitHub
parent 4a0c251e5c
commit de3a82eeb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 1 deletions

View File

@ -38,6 +38,7 @@ from metadata.generated.schema.entity.services.connections.testConnectionResult
) )
from metadata.generated.schema.type.basic import Timestamp from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import kill_active_connections
from metadata.profiler.orm.functions.conn_test import ConnTestFn from metadata.profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.logger import cli_logger from metadata.utils.logger import cli_logger
from metadata.utils.timeout import timeout from metadata.utils.timeout import timeout
@ -329,6 +330,8 @@ def test_connection_db_common(
timeout_seconds=timeout_seconds, timeout_seconds=timeout_seconds,
) )
kill_active_connections(engine)
def test_connection_db_schema_sources( def test_connection_db_schema_sources(
metadata: OpenMetadata, metadata: OpenMetadata,
@ -392,6 +395,8 @@ def test_connection_db_schema_sources(
automation_workflow=automation_workflow, automation_workflow=automation_workflow,
) )
kill_active_connections(engine)
def test_query(engine: Engine, statement: str): def test_query(engine: Engine, statement: str):
""" """

View File

@ -13,11 +13,16 @@
Main entrypoints to create and test connections Main entrypoints to create and test connections
for any source. for any source.
""" """
import traceback
from typing import Any, Callable from typing import Any, Callable
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy.engine import Engine
from metadata.utils.importer import import_connection_fn from metadata.utils.importer import import_connection_fn
from metadata.utils.logger import cli_logger
logger = cli_logger()
GET_CONNECTION_FN_NAME = "get_connection" GET_CONNECTION_FN_NAME = "get_connection"
TEST_CONNECTION_FN_NAME = "test_connection" TEST_CONNECTION_FN_NAME = "test_connection"
@ -47,3 +52,17 @@ def get_connection(connection: BaseModel) -> Any:
a service connection pydantic model a service connection pydantic model
""" """
return get_connection_fn(connection)(connection) return get_connection_fn(connection)(connection)
def kill_active_connections(engine: Engine):
"""
Method to kill the active connections
as well as idle connections in the engine
"""
try:
active_conn = engine.pool.checkedout() + engine.pool.checkedin()
if active_conn:
engine.dispose()
except Exception as exc:
logger.warning(f"Error Killing the active connections {exc}")
logger.debug(traceback.format_exc())

View File

@ -61,7 +61,10 @@ from metadata.ingestion.connections.session import create_and_bind_thread_safe_s
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.connections import (
get_connection,
kill_active_connections,
)
from metadata.ingestion.source.database.database_service import DatabaseServiceSource from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
@ -142,6 +145,8 @@ class CommonDbSourceService(
to setup multiple inspectors. They can use this function. to setup multiple inspectors. They can use this function.
:param database_name: new database to set :param database_name: new database to set
""" """
kill_active_connections(self.engine)
logger.info(f"Ingesting from database: {database_name}") logger.info(f"Ingesting from database: {database_name}")
new_service_connection = deepcopy(self.service_connection) new_service_connection = deepcopy(self.service_connection)