diff --git a/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py b/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py index a452496cdc8..ffd9e0efcdf 100644 --- a/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py +++ b/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py @@ -60,7 +60,8 @@ class SQAProfilerInterface(InterfaceProtocol): self._runner = None self._thread_count = thread_count self.service_connection_config = service_connection_config - self.session: Session = self._session_factory() + self.session_factory = self._session_factory() + self.session: Session = self.session_factory() @property def sample(self): @@ -83,9 +84,11 @@ class SQAProfilerInterface(InterfaceProtocol): return self._sampler def _session_factory(self): - """Create thread safe session""" + """Create thread safe session that will be automatically + garbage collected once the application thread ends + """ engine = get_connection(self.service_connection_config) - return create_and_bind_thread_safe_session(engine)() + return create_and_bind_thread_safe_session(engine) def _create_thread_safe_sampler( self, session, table, profile_sample, partition_details, profile_sample_query @@ -132,7 +135,8 @@ class SQAProfilerInterface(InterfaceProtocol): logger.debug( f"Running profiler for {table.__tablename__} on thread {threading.current_thread()}" ) - session = self._session_factory() + Session = self.session_factory + session = Session() sampler = self._create_thread_safe_sampler( session, table, profile_sample, partition_details, profile_sample_query ) diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index ae182c016ae..bab27b5678b 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -28,6 +28,7 @@ from sqlalchemy.event import listen from sqlalchemy.exc import OperationalError from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm.session import Session +from sqlalchemy.pool import QueuePool from metadata.generated.schema.entity.services.connections.connectionBasicType import ( ConnectionArguments, @@ -155,8 +156,10 @@ def create_generic_connection(connection, verbose: bool = False) -> Engine: engine = create_engine( get_connection_url(connection), connect_args=get_connection_args(connection), + poolclass=QueuePool, pool_reset_on_return=None, # https://docs.sqlalchemy.org/en/14/core/pooling.html#reset-on-return echo=verbose, + max_overflow=-1, ) listen(engine, "before_cursor_execute", inject_query_header, retval=True)