mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-16 12:38:13 +00:00
fix(ingest/bq): fix ordering of queries for use_queries_v2 (#11333)
This commit is contained in:
parent
fc92d23cc1
commit
837d00d391
@ -250,14 +250,29 @@ class BigQueryQueriesExtractor:
|
|||||||
|
|
||||||
with self.report.audit_log_preprocessing_timer:
|
with self.report.audit_log_preprocessing_timer:
|
||||||
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
|
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
|
||||||
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
|
# Using regular dictionary with
|
||||||
|
# key: usage bucket
|
||||||
|
# value: File backed dictionary with query hash as key and observed query as value
|
||||||
|
# This structure is chosen in order to maintain order of execution of queries
|
||||||
|
|
||||||
|
queries_deduped: Dict[int, FileBackedDict[ObservedQuery]]
|
||||||
queries_deduped = self.deduplicate_queries(queries)
|
queries_deduped = self.deduplicate_queries(queries)
|
||||||
self.report.num_unique_queries = len(queries_deduped)
|
self.report.num_unique_queries = len(
|
||||||
|
set(
|
||||||
|
query_hash
|
||||||
|
for bucket in queries_deduped.values()
|
||||||
|
for query_hash in bucket
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
with self.report.audit_log_load_timer:
|
with self.report.audit_log_load_timer:
|
||||||
i = 0
|
i = 0
|
||||||
for query_instances in queries_deduped.values():
|
for queries_in_bucket in queries_deduped.values():
|
||||||
for _, query in query_instances.items():
|
# Ordering is essential for column-level lineage via temporary table
|
||||||
|
for row in queries_in_bucket.sql_query_iterator(
|
||||||
|
"select value from data order by last_query_timestamp asc",
|
||||||
|
):
|
||||||
|
query = queries_in_bucket.deserializer(row["value"])
|
||||||
if i > 0 and i % 10000 == 0:
|
if i > 0 and i % 10000 == 0:
|
||||||
logger.info(f"Added {i} query log entries to SQL aggregator")
|
logger.info(f"Added {i} query log entries to SQL aggregator")
|
||||||
|
|
||||||
@ -268,7 +283,7 @@ class BigQueryQueriesExtractor:
|
|||||||
|
|
||||||
def deduplicate_queries(
|
def deduplicate_queries(
|
||||||
self, queries: FileBackedList[ObservedQuery]
|
self, queries: FileBackedList[ObservedQuery]
|
||||||
) -> FileBackedDict[Dict[int, ObservedQuery]]:
|
) -> Dict[int, FileBackedDict[ObservedQuery]]:
|
||||||
|
|
||||||
# This fingerprint based deduplication is done here to reduce performance hit due to
|
# This fingerprint based deduplication is done here to reduce performance hit due to
|
||||||
# repetitive sql parsing while adding observed query to aggregator that would otherwise
|
# repetitive sql parsing while adding observed query to aggregator that would otherwise
|
||||||
@ -276,7 +291,7 @@ class BigQueryQueriesExtractor:
|
|||||||
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
|
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
|
||||||
# only once per day, although it may have happened multiple times throughout the day.
|
# only once per day, although it may have happened multiple times throughout the day.
|
||||||
|
|
||||||
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict()
|
queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict()
|
||||||
|
|
||||||
for i, query in enumerate(queries):
|
for i, query in enumerate(queries):
|
||||||
if i > 0 and i % 10000 == 0:
|
if i > 0 and i % 10000 == 0:
|
||||||
@ -295,14 +310,20 @@ class BigQueryQueriesExtractor:
|
|||||||
query.query, self.identifiers.platform, fast=True
|
query.query, self.identifiers.platform, fast=True
|
||||||
)
|
)
|
||||||
|
|
||||||
query_instances = queries_deduped.setdefault(query.query_hash, {})
|
if time_bucket not in queries_deduped:
|
||||||
|
# TODO: Cleanup, etc as required for file backed dicts after use
|
||||||
|
queries_deduped[time_bucket] = FileBackedDict[ObservedQuery](
|
||||||
|
extra_columns={"last_query_timestamp": lambda e: e.timestamp}
|
||||||
|
)
|
||||||
|
|
||||||
observed_query = query_instances.setdefault(time_bucket, query)
|
observed_query = queries_deduped[time_bucket].get(query.query_hash)
|
||||||
|
|
||||||
# If the query already exists for this time bucket, update its attributes
|
# If the query already exists for this time bucket, update its attributes
|
||||||
if observed_query is not query:
|
if observed_query is not None:
|
||||||
observed_query.usage_multiplier += 1
|
observed_query.usage_multiplier += 1
|
||||||
observed_query.timestamp = query.timestamp
|
observed_query.timestamp = query.timestamp
|
||||||
|
else:
|
||||||
|
queries_deduped[time_bucket][query.query_hash] = query
|
||||||
|
|
||||||
return queries_deduped
|
return queries_deduped
|
||||||
|
|
||||||
|
|||||||
@ -579,7 +579,9 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None:
|
|||||||
|
|
||||||
|
|
||||||
@freeze_time(FROZEN_TIME)
|
@freeze_time(FROZEN_TIME)
|
||||||
def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None:
|
def test_table_lineage_via_temp_table_disordered_add(
|
||||||
|
pytestconfig: pytest.Config,
|
||||||
|
) -> None:
|
||||||
aggregator = SqlParsingAggregator(
|
aggregator = SqlParsingAggregator(
|
||||||
platform="redshift",
|
platform="redshift",
|
||||||
generate_lineage=True,
|
generate_lineage=True,
|
||||||
@ -607,7 +609,8 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N
|
|||||||
mce_helpers.check_goldens_stream(
|
mce_helpers.check_goldens_stream(
|
||||||
pytestconfig,
|
pytestconfig,
|
||||||
outputs=mcps,
|
outputs=mcps,
|
||||||
golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json",
|
golden_path=RESOURCE_DIR
|
||||||
|
/ "test_table_lineage_via_temp_table_disordered_add.json",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user