Fixes #6476 -- Multiple db connections opened (#6478)

* Fixed scoped_session factory creation

* Fixed py format

* Added comment for GC + added explicit QueuePool definition
This commit is contained in:
Teddy 2022-08-01 13:42:35 +02:00 committed by GitHub
parent ce7c0180e2
commit a7fffc778c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 4 deletions

View File

@ -60,7 +60,8 @@ class SQAProfilerInterface(InterfaceProtocol):
self._runner = None
self._thread_count = thread_count
self.service_connection_config = service_connection_config
self.session: Session = self._session_factory()
self.session_factory = self._session_factory()
self.session: Session = self.session_factory()
@property
def sample(self):
@ -83,9 +84,11 @@ class SQAProfilerInterface(InterfaceProtocol):
return self._sampler
def _session_factory(self):
"""Create thread safe session"""
"""Create thread safe session that will be automatically
garbage collected once the application thread ends
"""
engine = get_connection(self.service_connection_config)
return create_and_bind_thread_safe_session(engine)()
return create_and_bind_thread_safe_session(engine)
def _create_thread_safe_sampler(
self, session, table, profile_sample, partition_details, profile_sample_query
@ -132,7 +135,8 @@ class SQAProfilerInterface(InterfaceProtocol):
logger.debug(
f"Running profiler for {table.__tablename__} on thread {threading.current_thread()}"
)
session = self._session_factory()
Session = self.session_factory
session = Session()
sampler = self._create_thread_safe_sampler(
session, table, profile_sample, partition_details, profile_sample_query
)

View File

@ -28,6 +28,7 @@ from sqlalchemy.event import listen
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session
from sqlalchemy.pool import QueuePool
from metadata.generated.schema.entity.services.connections.connectionBasicType import (
ConnectionArguments,
@ -155,8 +156,10 @@ def create_generic_connection(connection, verbose: bool = False) -> Engine:
engine = create_engine(
get_connection_url(connection),
connect_args=get_connection_args(connection),
poolclass=QueuePool,
pool_reset_on_return=None, # https://docs.sqlalchemy.org/en/14/core/pooling.html#reset-on-return
echo=verbose,
max_overflow=-1,
)
listen(engine, "before_cursor_execute", inject_query_header, retval=True)