close-connection-after-each-batch-using-with (#6641)

This commit is contained in:
Abhishek Pandey 2022-08-09 11:15:56 +05:30 committed by GitHub
parent 555962dfd3
commit 77ad1b1c87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -49,7 +49,6 @@ class UsageSource(Source[TableQuery], ABC):
self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) self.start, self.end = get_start_and_end(self.source_config.queryLogDuration)
self.analysis_date = self.end self.analysis_date = self.end
self.report = SQLSourceStatus() self.report = SQLSourceStatus()
self.engine = get_connection(self.connection)
def prepare(self): def prepare(self):
return super().prepare() return super().prepare()
@ -117,40 +116,41 @@ class UsageSource(Source[TableQuery], ABC):
f"Scanning query logs for {(self.start+timedelta(days=i)).date()} - {(self.start+timedelta(days=i+1)).date()}" f"Scanning query logs for {(self.start+timedelta(days=i)).date()} - {(self.start+timedelta(days=i+1)).date()}"
) )
try: try:
rows = self.engine.execute( with get_connection(self.connection).connect() as conn:
self.get_sql_statement( rows = conn.execute(
start_time=self.start + timedelta(days=i), self.get_sql_statement(
end_time=self.start + timedelta(days=i + 1), start_time=self.start + timedelta(days=i),
) end_time=self.start + timedelta(days=i + 1),
)
queries = []
for row in rows:
row = dict(row)
try:
if filter_by_database(
self.source_config.databaseFilterPattern,
self.get_database_name(row),
) or filter_by_schema(
self.source_config.schemaFilterPattern,
schema_name=row["schema_name"],
):
continue
queries.append(
TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=str(row["start_time"]),
endTime=str(row["end_time"]),
analysisDate=row["start_time"],
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
) )
except Exception as err: )
logger.debug(traceback.format_exc()) queries = []
logger.error(str(err)) for row in rows:
row = dict(row)
try:
if filter_by_database(
self.source_config.databaseFilterPattern,
self.get_database_name(row),
) or filter_by_schema(
self.source_config.schemaFilterPattern,
schema_name=row["schema_name"],
):
continue
queries.append(
TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=str(row["start_time"]),
endTime=str(row["end_time"]),
analysisDate=row["start_time"],
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
yield TableQueries(queries=queries) yield TableQueries(queries=queries)
except Exception as err: except Exception as err:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())