mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-03 12:16:10 +00:00
fix(snowflake): fixes deduplication and fingerprint requirements for Hex (#13121)
This commit is contained in:
parent
75894399f0
commit
e7d8f2913c
@ -515,7 +515,10 @@ class SnowflakeQueriesExtractor(SnowflakeStructuredReportMixin, Closeable):
|
||||
# job at eliminating redundant / repetitive queries. As such, we include the fast fingerprint
|
||||
# here
|
||||
query_id=get_query_fingerprint(
|
||||
res["query_text"], self.identifiers.platform, fast=True
|
||||
res["query_text"],
|
||||
self.identifiers.platform,
|
||||
fast=True,
|
||||
secondary_id=res["query_secondary_fingerprint"],
|
||||
),
|
||||
query_text=res["query_text"],
|
||||
upstreams=upstreams,
|
||||
@ -654,7 +657,17 @@ WITH
|
||||
fingerprinted_queries as (
|
||||
SELECT *,
|
||||
-- TODO: Generate better fingerprints for each query by pushing down regex logic.
|
||||
query_history.query_parameterized_hash as query_fingerprint
|
||||
query_history.query_parameterized_hash as query_fingerprint,
|
||||
-- Optional and additional hash to be used for query deduplication and final query identity
|
||||
CASE
|
||||
WHEN CONTAINS(query_history.query_text, '-- Hex query metadata:')
|
||||
-- Extract project id and hash it
|
||||
THEN CAST(HASH(
|
||||
REGEXP_SUBSTR(query_history.query_text, '"project_id"\\\\s*:\\\\s*"([^"]+)"', 1, 1, 'e', 1),
|
||||
REGEXP_SUBSTR(query_history.query_text, '"context"\\\\s*:\\\\s*"([^"]+)"', 1, 1, 'e', 1)
|
||||
) AS VARCHAR)
|
||||
ELSE NULL
|
||||
END as query_secondary_fingerprint
|
||||
FROM
|
||||
snowflake.account_usage.query_history
|
||||
WHERE
|
||||
@ -670,11 +683,11 @@ fingerprinted_queries as (
|
||||
{time_bucket_size},
|
||||
CONVERT_TIMEZONE('UTC', start_time)
|
||||
) AS bucket_start_time,
|
||||
COUNT(*) OVER (PARTITION BY bucket_start_time, query_fingerprint) AS query_count,
|
||||
COUNT(*) OVER (PARTITION BY bucket_start_time, query_fingerprint, query_secondary_fingerprint) AS query_count,
|
||||
FROM
|
||||
fingerprinted_queries
|
||||
QUALIFY
|
||||
ROW_NUMBER() OVER (PARTITION BY bucket_start_time, query_fingerprint ORDER BY start_time DESC) = 1
|
||||
ROW_NUMBER() OVER (PARTITION BY bucket_start_time, query_fingerprint, query_secondary_fingerprint ORDER BY start_time DESC) = 1
|
||||
)
|
||||
, raw_access_history AS (
|
||||
SELECT
|
||||
@ -714,6 +727,7 @@ fingerprinted_queries as (
|
||||
q.bucket_start_time,
|
||||
q.query_id,
|
||||
q.query_fingerprint,
|
||||
q.query_secondary_fingerprint,
|
||||
q.query_count,
|
||||
q.session_id AS "SESSION_ID",
|
||||
q.start_time AS "QUERY_START_TIME",
|
||||
|
||||
@ -257,7 +257,10 @@ def generate_hash(text: str) -> str:
|
||||
|
||||
|
||||
def get_query_fingerprint_debug(
|
||||
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr, fast: bool = False
|
||||
expression: sqlglot.exp.ExpOrStr,
|
||||
platform: DialectOrStr,
|
||||
fast: bool = False,
|
||||
secondary_id: Optional[str] = None,
|
||||
) -> Tuple[str, Optional[str]]:
|
||||
try:
|
||||
if not fast:
|
||||
@ -272,16 +275,18 @@ def get_query_fingerprint_debug(
|
||||
logger.debug("Failed to generalize query for fingerprinting: %s", e)
|
||||
expression_sql = None
|
||||
|
||||
fingerprint = generate_hash(
|
||||
expression_sql
|
||||
if expression_sql is not None
|
||||
else _expression_to_string(expression, platform=platform)
|
||||
)
|
||||
text = expression_sql or _expression_to_string(expression, platform=platform)
|
||||
if secondary_id:
|
||||
text = text + " -- " + secondary_id
|
||||
fingerprint = generate_hash(text=text)
|
||||
return fingerprint, expression_sql
|
||||
|
||||
|
||||
def get_query_fingerprint(
|
||||
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr, fast: bool = False
|
||||
expression: sqlglot.exp.ExpOrStr,
|
||||
platform: DialectOrStr,
|
||||
fast: bool = False,
|
||||
secondary_id: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Get a fingerprint for a SQL query.
|
||||
|
||||
@ -298,12 +303,15 @@ def get_query_fingerprint(
|
||||
Args:
|
||||
expression: The SQL query to fingerprint.
|
||||
platform: The SQL dialect to use.
|
||||
secondary_id: An optional additional id string to included in the final fingerprint.
|
||||
|
||||
Returns:
|
||||
The fingerprint for the SQL query.
|
||||
"""
|
||||
|
||||
return get_query_fingerprint_debug(expression, platform, fast=fast)[0]
|
||||
return get_query_fingerprint_debug(
|
||||
expression=expression, platform=platform, fast=fast, secondary_id=secondary_id
|
||||
)[0]
|
||||
|
||||
|
||||
@functools.lru_cache(maxsize=FORMAT_QUERY_CACHE_SIZE)
|
||||
|
||||
@ -198,3 +198,33 @@ def test_redshift_query_fingerprint():
|
||||
assert get_query_fingerprint(query1, "redshift", True) != get_query_fingerprint(
|
||||
query2, "redshift", True
|
||||
)
|
||||
|
||||
|
||||
def test_query_fingerprint_with_secondary_id():
|
||||
query = "SELECT * FROM users WHERE id = 123"
|
||||
|
||||
fingerprint1 = get_query_fingerprint(query, "snowflake")
|
||||
|
||||
fingerprint2 = get_query_fingerprint(
|
||||
query, "snowflake", secondary_id="project_id_123"
|
||||
)
|
||||
|
||||
fingerprint3 = get_query_fingerprint(
|
||||
query, "snowflake", secondary_id="project_id_456"
|
||||
)
|
||||
|
||||
assert fingerprint1 and fingerprint2 and fingerprint3, (
|
||||
"Fingerprint should not be None"
|
||||
)
|
||||
assert fingerprint1 != fingerprint2, "Fingerprint should change with secondary_id"
|
||||
assert fingerprint2 != fingerprint3, (
|
||||
"Different secondary_id should yield different fingerprints"
|
||||
)
|
||||
|
||||
fingerprint4 = get_query_fingerprint(
|
||||
query, "snowflake", secondary_id="project_id_456"
|
||||
)
|
||||
|
||||
assert fingerprint3 == fingerprint4, (
|
||||
"Fingerprints are deterministic for the same secondary_id"
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user