mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 18:07:57 +00:00
fix(ingest/redshift): Fix query sequence duplication for serverless mode (#12353)
This commit is contained in:
parent
7eab2eb8d9
commit
68218768d3
@ -797,61 +797,91 @@ class RedshiftServerlessQuery(RedshiftCommonQuery):
|
||||
db_name: str, start_time: datetime, end_time: datetime
|
||||
) -> str:
|
||||
return """
|
||||
WITH queries AS (
|
||||
SELECT
|
||||
sti.database as cluster,
|
||||
sti.schema AS "schema",
|
||||
sti.table AS "table",
|
||||
qs.table_id AS table_id,
|
||||
qs.query_id as query_id,
|
||||
qs.step_name as step_name,
|
||||
sui.user_name as username,
|
||||
source,
|
||||
MIN(qs.start_time) as "timestamp" -- multiple duplicate records with start_time increasing slightly by miliseconds
|
||||
FROM
|
||||
SYS_QUERY_DETAIL qs
|
||||
JOIN
|
||||
SVV_TABLE_INFO sti ON sti.table_id = qs.table_id
|
||||
LEFT JOIN
|
||||
SVV_USER_INFO sui ON qs.user_id = sui.user_id
|
||||
WHERE
|
||||
cluster = '{db_name}' AND
|
||||
qs.user_id <> 1 AND -- this is user 'rdsdb'
|
||||
qs.start_time >= '{start_time}' AND
|
||||
qs.start_time < '{end_time}'
|
||||
GROUP BY cluster, "schema", "table", qs.table_id, query_id, step_name, username, source -- to be sure we are not making duplicates ourselves the list of group by must match whatever we use in "group by" and "where" of subsequent queries ("cluster" is already set to single value in this query)
|
||||
),
|
||||
unique_query_text AS (
|
||||
SELECT
|
||||
query_id,
|
||||
sequence,
|
||||
text
|
||||
FROM (
|
||||
SELECT
|
||||
query_id,
|
||||
"sequence",
|
||||
text,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY query_id, sequence
|
||||
) as rn
|
||||
FROM SYS_QUERY_TEXT
|
||||
)
|
||||
WHERE rn = 1
|
||||
),
|
||||
scan_queries AS (
|
||||
SELECT
|
||||
"schema" as source_schema,
|
||||
"table" as source_table,
|
||||
table_id as source_table_id,
|
||||
queries.query_id as query_id,
|
||||
username,
|
||||
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
|
||||
FROM
|
||||
"queries" LEFT JOIN
|
||||
unique_query_text qt ON qt.query_id = queries.query_id
|
||||
WHERE
|
||||
source = 'Redshift(local)' AND
|
||||
step_name = 'scan' AND
|
||||
qt.sequence < 16 -- truncating query to not exceed Redshift limit on LISTAGG function (each sequence has at most 4k characters, limit is 64k, divided by 4k gives 16, starts count from 0)
|
||||
GROUP BY source_schema, source_table, source_table_id, queries.query_id, username
|
||||
),
|
||||
insert_queries AS (
|
||||
SELECT
|
||||
"schema" as target_schema,
|
||||
"table" as target_table,
|
||||
table_id as target_table_id,
|
||||
query_id,
|
||||
cluster,
|
||||
min("timestamp") as "timestamp"
|
||||
FROM
|
||||
queries
|
||||
WHERE
|
||||
step_name = 'insert'
|
||||
GROUP BY cluster, target_schema, target_table, target_table_id, query_id
|
||||
)
|
||||
SELECT
|
||||
distinct cluster,
|
||||
cluster,
|
||||
target_schema,
|
||||
target_table,
|
||||
username,
|
||||
source_schema,
|
||||
source_table,
|
||||
query_text AS ddl,
|
||||
start_time AS timestamp
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
sti.schema AS target_schema,
|
||||
sti.table AS target_table,
|
||||
sti.database AS cluster,
|
||||
qi.table_id AS target_table_id,
|
||||
qi.query_id AS query_id,
|
||||
qi.start_time AS start_time
|
||||
FROM
|
||||
SYS_QUERY_DETAIL qi
|
||||
JOIN
|
||||
SVV_TABLE_INFO sti on sti.table_id = qi.table_id
|
||||
WHERE
|
||||
start_time >= '{start_time}' and
|
||||
start_time < '{end_time}' and
|
||||
cluster = '{db_name}' and
|
||||
step_name = 'insert'
|
||||
) AS target_tables
|
||||
JOIN
|
||||
(
|
||||
SELECT
|
||||
sti.schema AS source_schema,
|
||||
sti.table AS source_table,
|
||||
qs.table_id AS source_table_id,
|
||||
qs.query_id AS query_id,
|
||||
sui.user_name AS username,
|
||||
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
|
||||
FROM
|
||||
SYS_QUERY_DETAIL qs
|
||||
JOIN
|
||||
SVV_TABLE_INFO sti ON sti.table_id = qs.table_id
|
||||
LEFT JOIN
|
||||
SYS_QUERY_TEXT qt ON qt.query_id = qs.query_id
|
||||
LEFT JOIN
|
||||
SVV_USER_INFO sui ON qs.user_id = sui.user_id
|
||||
WHERE
|
||||
qs.step_name = 'scan' AND
|
||||
qs.source = 'Redshift(local)' AND
|
||||
qt.sequence < 16 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
|
||||
sti.database = '{db_name}' AND -- this was required to not retrieve some internal redshift tables, try removing to see what happens
|
||||
sui.user_name <> 'rdsdb' -- not entirely sure about this filter
|
||||
GROUP BY sti.schema, sti.table, qs.table_id, qs.query_id, sui.user_name
|
||||
) AS source_tables ON target_tables.query_id = source_tables.query_id
|
||||
WHERE source_tables.source_table_id <> target_tables.target_table_id
|
||||
ORDER BY cluster, target_schema, target_table, start_time ASC
|
||||
"timestamp"
|
||||
FROM scan_queries
|
||||
JOIN insert_queries on insert_queries.query_id = scan_queries.query_id
|
||||
WHERE source_table_id <> target_table_id
|
||||
ORDER BY cluster, target_schema, target_table, "timestamp" ASC;
|
||||
""".format(
|
||||
# We need the original database name for filtering
|
||||
db_name=db_name,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user