From 77ad1b1c87320515d0357d0b9f6f19f88cb75358 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 9 Aug 2022 11:15:56 +0530 Subject: [PATCH] close-connection-after-each-batch-using-with (#6641) --- .../ingestion/source/database/usage_source.py | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index fbc1530d01f..15c0c23791e 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -49,7 +49,6 @@ class UsageSource(Source[TableQuery], ABC): self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) self.analysis_date = self.end self.report = SQLSourceStatus() - self.engine = get_connection(self.connection) def prepare(self): return super().prepare() @@ -117,40 +116,41 @@ class UsageSource(Source[TableQuery], ABC): f"Scanning query logs for {(self.start+timedelta(days=i)).date()} - {(self.start+timedelta(days=i+1)).date()}" ) try: - rows = self.engine.execute( - self.get_sql_statement( - start_time=self.start + timedelta(days=i), - end_time=self.start + timedelta(days=i + 1), - ) - ) - queries = [] - for row in rows: - row = dict(row) - try: - if filter_by_database( - self.source_config.databaseFilterPattern, - self.get_database_name(row), - ) or filter_by_schema( - self.source_config.schemaFilterPattern, - schema_name=row["schema_name"], - ): - continue - queries.append( - TableQuery( - query=row["query_text"], - userName=row["user_name"], - startTime=str(row["start_time"]), - endTime=str(row["end_time"]), - analysisDate=row["start_time"], - aborted=self.get_aborted_status(row), - databaseName=self.get_database_name(row), - serviceName=self.config.serviceName, - databaseSchema=self.get_schema_name(row), - ) + with get_connection(self.connection).connect() as conn: + rows = conn.execute( + self.get_sql_statement( + start_time=self.start + timedelta(days=i), + end_time=self.start + timedelta(days=i + 1), ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(str(err)) + ) + queries = [] + for row in rows: + row = dict(row) + try: + if filter_by_database( + self.source_config.databaseFilterPattern, + self.get_database_name(row), + ) or filter_by_schema( + self.source_config.schemaFilterPattern, + schema_name=row["schema_name"], + ): + continue + queries.append( + TableQuery( + query=row["query_text"], + userName=row["user_name"], + startTime=str(row["start_time"]), + endTime=str(row["end_time"]), + analysisDate=row["start_time"], + aborted=self.get_aborted_status(row), + databaseName=self.get_database_name(row), + serviceName=self.config.serviceName, + databaseSchema=self.get_schema_name(row), + ) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(str(err)) yield TableQueries(queries=queries) except Exception as err: logger.debug(traceback.format_exc())