fix(ingest): followup on bigquery queries v2 ordering (#11353)

This commit is contained in:
Mayuri Nehate 2024-09-11 23:17:21 +05:30 committed by GitHub
parent fd6d4c88ec
commit 325a608cc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -247,32 +247,21 @@ class BigQueryQueriesExtractor:
self.report.num_queries_by_project[project.id] += 1
queries.append(entry)
self.report.num_total_queries = len(queries)
logger.info(f"Found {self.report.num_total_queries} total queries")
with self.report.audit_log_preprocessing_timer:
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
# Using regular dictionary with
# key: usage bucket
# value: File backed dictionary with query hash as key and observed query as value
# This structure is chosen in order to maintain order of execution of queries
queries_deduped: Dict[int, FileBackedDict[ObservedQuery]]
# Note: FileBackedDict is an ordered dictionary, so the order of execution of
# queries is inherently maintained
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
queries_deduped = self.deduplicate_queries(queries)
self.report.num_unique_queries = len(
set(
query_hash
for bucket in queries_deduped.values()
for query_hash in bucket
)
)
self.report.num_unique_queries = len(queries_deduped)
logger.info(f"Found {self.report.num_unique_queries} unique queries")
with self.report.audit_log_load_timer:
i = 0
for queries_in_bucket in queries_deduped.values():
# Ordering is essential for column-level lineage via temporary table
for row in queries_in_bucket.sql_query_iterator(
"select value from data order by last_query_timestamp asc",
):
query = queries_in_bucket.deserializer(row["value"])
for _, query_instances in queries_deduped.items():
for query in query_instances.values():
if i > 0 and i % 10000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")
@ -283,7 +272,7 @@ class BigQueryQueriesExtractor:
def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> Dict[int, FileBackedDict[ObservedQuery]]:
) -> FileBackedDict[Dict[int, ObservedQuery]]:
# This fingerprint based deduplication is done here to reduce performance hit due to
# repetitive sql parsing while adding observed query to aggregator that would otherwise
@ -291,7 +280,7 @@ class BigQueryQueriesExtractor:
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
# only once per day, although it may have happened multiple times throughout the day.
queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict()
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict()
for i, query in enumerate(queries):
if i > 0 and i % 10000 == 0:
@ -310,20 +299,14 @@ class BigQueryQueriesExtractor:
query.query, self.identifiers.platform, fast=True
)
if time_bucket not in queries_deduped:
# TODO: Cleanup, etc as required for file backed dicts after use
queries_deduped[time_bucket] = FileBackedDict[ObservedQuery](
extra_columns={"last_query_timestamp": lambda e: e.timestamp}
)
query_instances = queries_deduped.setdefault(query.query_hash, {})
observed_query = queries_deduped[time_bucket].get(query.query_hash)
observed_query = query_instances.setdefault(time_bucket, query)
# If the query already exists for this time bucket, update its attributes
if observed_query is not None:
if observed_query is not query:
observed_query.usage_multiplier += 1
observed_query.timestamp = query.timestamp
else:
queries_deduped[time_bucket][query.query_hash] = query
return queries_deduped