mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-13 11:06:30 +00:00
fix(ingest/redshift): reduce sequence limit for LISTAGG (#11621)
Co-authored-by: treff7es <treff7es@gmail.com> Co-authored-by: Aseem Bansal <asmbansal2@gmail.com>
This commit is contained in:
parent
6b09346ca5
commit
68cd17b34e
@ -4,6 +4,12 @@ from typing import List
|
|||||||
redshift_datetime_format = "%Y-%m-%d %H:%M:%S"
|
redshift_datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||||
|
|
||||||
|
|
||||||
|
# See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
|
||||||
|
# for why we need to limit the size of the query text.
|
||||||
|
# We use 290 instead instead of the standard 320, because escape characters can add to the length.
|
||||||
|
_QUERY_SEQUENCE_LIMIT = 290
|
||||||
|
|
||||||
|
|
||||||
class RedshiftCommonQuery:
|
class RedshiftCommonQuery:
|
||||||
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
|
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
|
||||||
CREATE_TEMPORARY_TABLE_CLAUSE = "create temporary table"
|
CREATE_TEMPORARY_TABLE_CLAUSE = "create temporary table"
|
||||||
@ -487,71 +493,70 @@ class RedshiftProvisionedQuery(RedshiftCommonQuery):
|
|||||||
def list_insert_create_queries_sql(
|
def list_insert_create_queries_sql(
|
||||||
db_name: str, start_time: datetime, end_time: datetime
|
db_name: str, start_time: datetime, end_time: datetime
|
||||||
) -> str:
|
) -> str:
|
||||||
return """
|
return """\
|
||||||
with query_txt as
|
with query_txt as (
|
||||||
(
|
select
|
||||||
select
|
query,
|
||||||
query,
|
pid,
|
||||||
pid,
|
LISTAGG(case
|
||||||
LISTAGG(case
|
when LEN(RTRIM(text)) = 0 then text
|
||||||
when LEN(RTRIM(text)) = 0 then text
|
else RTRIM(text)
|
||||||
else RTRIM(text)
|
end) within group (
|
||||||
end) within group (
|
order by sequence
|
||||||
order by
|
) as ddl
|
||||||
sequence) as ddl
|
from (
|
||||||
from
|
select
|
||||||
(
|
query,
|
||||||
select
|
pid,
|
||||||
query,
|
text,
|
||||||
pid,
|
sequence
|
||||||
text,
|
from
|
||||||
sequence
|
STL_QUERYTEXT
|
||||||
from
|
where
|
||||||
STL_QUERYTEXT
|
sequence < {_QUERY_SEQUENCE_LIMIT}
|
||||||
where
|
order by
|
||||||
sequence < 320
|
sequence
|
||||||
order by
|
)
|
||||||
sequence
|
group by
|
||||||
)
|
query,
|
||||||
group by
|
pid
|
||||||
query,
|
)
|
||||||
pid
|
select
|
||||||
)
|
distinct tbl as target_table_id,
|
||||||
select
|
sti.schema as target_schema,
|
||||||
distinct tbl as target_table_id,
|
sti.table as target_table,
|
||||||
sti.schema as target_schema,
|
sti.database as cluster,
|
||||||
sti.table as target_table,
|
usename as username,
|
||||||
sti.database as cluster,
|
ddl,
|
||||||
usename as username,
|
sq.query as query_id,
|
||||||
ddl,
|
min(si.starttime) as timestamp,
|
||||||
sq.query as query_id,
|
ANY_VALUE(pid) as session_id
|
||||||
min(si.starttime) as timestamp,
|
from
|
||||||
ANY_VALUE(pid) as session_id
|
stl_insert as si
|
||||||
from
|
left join SVV_TABLE_INFO sti on
|
||||||
stl_insert as si
|
sti.table_id = tbl
|
||||||
left join SVV_TABLE_INFO sti on
|
left join svl_user_info sui on
|
||||||
sti.table_id = tbl
|
si.userid = sui.usesysid
|
||||||
left join svl_user_info sui on
|
left join query_txt sq on
|
||||||
si.userid = sui.usesysid
|
si.query = sq.query
|
||||||
left join query_txt sq on
|
left join stl_load_commits slc on
|
||||||
si.query = sq.query
|
slc.query = si.query
|
||||||
left join stl_load_commits slc on
|
where
|
||||||
slc.query = si.query
|
sui.usename <> 'rdsdb'
|
||||||
where
|
and cluster = '{db_name}'
|
||||||
sui.usename <> 'rdsdb'
|
and slc.query IS NULL
|
||||||
and cluster = '{db_name}'
|
and si.starttime >= '{start_time}'
|
||||||
and slc.query IS NULL
|
and si.starttime < '{end_time}'
|
||||||
and si.starttime >= '{start_time}'
|
group by
|
||||||
and si.starttime < '{end_time}'
|
target_table_id,
|
||||||
group by
|
target_schema,
|
||||||
target_table_id,
|
target_table,
|
||||||
target_schema,
|
cluster,
|
||||||
target_table,
|
username,
|
||||||
cluster,
|
ddl,
|
||||||
username,
|
sq.query
|
||||||
ddl,
|
|
||||||
sq.query
|
|
||||||
""".format(
|
""".format(
|
||||||
|
_QUERY_SEQUENCE_LIMIT=_QUERY_SEQUENCE_LIMIT,
|
||||||
# We need the original database name for filtering
|
# We need the original database name for filtering
|
||||||
db_name=db_name,
|
db_name=db_name,
|
||||||
start_time=start_time.strftime(redshift_datetime_format),
|
start_time=start_time.strftime(redshift_datetime_format),
|
||||||
@ -564,84 +569,82 @@ class RedshiftProvisionedQuery(RedshiftCommonQuery):
|
|||||||
|
|
||||||
end_time_str: str = end_time.strftime(redshift_datetime_format)
|
end_time_str: str = end_time.strftime(redshift_datetime_format)
|
||||||
|
|
||||||
return rf"""-- DataHub Redshift Source temp table DDL query
|
return rf"""\
|
||||||
|
-- DataHub Redshift Source temp table DDL query
|
||||||
|
select
|
||||||
|
*
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
session_id,
|
||||||
|
transaction_id,
|
||||||
|
start_time,
|
||||||
|
userid,
|
||||||
|
REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') as create_command,
|
||||||
|
query_text,
|
||||||
|
row_number() over (
|
||||||
|
partition by session_id, TRIM(query_text)
|
||||||
|
order by start_time desc
|
||||||
|
) rn
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
pid as session_id,
|
||||||
|
xid as transaction_id,
|
||||||
|
starttime as start_time,
|
||||||
|
type,
|
||||||
|
query_text,
|
||||||
|
userid
|
||||||
|
from (
|
||||||
select
|
select
|
||||||
*
|
starttime,
|
||||||
|
pid,
|
||||||
|
xid,
|
||||||
|
type,
|
||||||
|
userid,
|
||||||
|
LISTAGG(case
|
||||||
|
when LEN(RTRIM(text)) = 0 then text
|
||||||
|
else RTRIM(text)
|
||||||
|
end,
|
||||||
|
'') within group (
|
||||||
|
order by sequence
|
||||||
|
) as query_text
|
||||||
from
|
from
|
||||||
(
|
SVL_STATEMENTTEXT
|
||||||
select
|
|
||||||
session_id,
|
|
||||||
transaction_id,
|
|
||||||
start_time,
|
|
||||||
userid,
|
|
||||||
REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') as create_command,
|
|
||||||
query_text,
|
|
||||||
row_number() over (
|
|
||||||
partition by session_id, TRIM(query_text)
|
|
||||||
order by start_time desc
|
|
||||||
) rn
|
|
||||||
from
|
|
||||||
(
|
|
||||||
select
|
|
||||||
pid as session_id,
|
|
||||||
xid as transaction_id,
|
|
||||||
starttime as start_time,
|
|
||||||
type,
|
|
||||||
query_text,
|
|
||||||
userid
|
|
||||||
from
|
|
||||||
(
|
|
||||||
select
|
|
||||||
starttime,
|
|
||||||
pid,
|
|
||||||
xid,
|
|
||||||
type,
|
|
||||||
userid,
|
|
||||||
LISTAGG(case
|
|
||||||
when LEN(RTRIM(text)) = 0 then text
|
|
||||||
else RTRIM(text)
|
|
||||||
end,
|
|
||||||
'') within group (
|
|
||||||
order by sequence
|
|
||||||
) as query_text
|
|
||||||
from
|
|
||||||
SVL_STATEMENTTEXT
|
|
||||||
where
|
|
||||||
type in ('DDL', 'QUERY')
|
|
||||||
AND starttime >= '{start_time_str}'
|
|
||||||
AND starttime < '{end_time_str}'
|
|
||||||
-- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
|
|
||||||
AND sequence < 320
|
|
||||||
group by
|
|
||||||
starttime,
|
|
||||||
pid,
|
|
||||||
xid,
|
|
||||||
type,
|
|
||||||
userid
|
|
||||||
order by
|
|
||||||
starttime,
|
|
||||||
pid,
|
|
||||||
xid,
|
|
||||||
type,
|
|
||||||
userid
|
|
||||||
asc)
|
|
||||||
where
|
|
||||||
type in ('DDL', 'QUERY')
|
|
||||||
)
|
|
||||||
where
|
|
||||||
(create_command ilike 'create temp table %'
|
|
||||||
or create_command ilike 'create temporary table %'
|
|
||||||
-- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction
|
|
||||||
or create_command ilike 'create table %')
|
|
||||||
-- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.
|
|
||||||
and query_text not ilike 'CREATE TEMP TABLE volt_tt_%'
|
|
||||||
and create_command not like 'CREATE TEMP TABLE volt_tt_'
|
|
||||||
-- We need to filter out our query and it was not possible earlier when we did not have any comment in the query
|
|
||||||
and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%'
|
|
||||||
|
|
||||||
)
|
|
||||||
where
|
where
|
||||||
rn = 1
|
type in ('DDL', 'QUERY')
|
||||||
|
AND starttime >= '{start_time_str}'
|
||||||
|
AND starttime < '{end_time_str}'
|
||||||
|
AND sequence < {_QUERY_SEQUENCE_LIMIT}
|
||||||
|
group by
|
||||||
|
starttime,
|
||||||
|
pid,
|
||||||
|
xid,
|
||||||
|
type,
|
||||||
|
userid
|
||||||
|
order by
|
||||||
|
starttime,
|
||||||
|
pid,
|
||||||
|
xid,
|
||||||
|
type,
|
||||||
|
userid
|
||||||
|
asc
|
||||||
|
)
|
||||||
|
where
|
||||||
|
type in ('DDL', 'QUERY')
|
||||||
|
)
|
||||||
|
where
|
||||||
|
(create_command ilike 'create temp table %'
|
||||||
|
or create_command ilike 'create temporary table %'
|
||||||
|
-- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction
|
||||||
|
or create_command ilike 'create table %')
|
||||||
|
-- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.
|
||||||
|
and query_text not ilike 'CREATE TEMP TABLE volt_tt_%'
|
||||||
|
and create_command not like 'CREATE TEMP TABLE volt_tt_'
|
||||||
|
-- We need to filter out our query and it was not possible earlier when we did not have any comment in the query
|
||||||
|
and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%'
|
||||||
|
|
||||||
|
)
|
||||||
|
where
|
||||||
|
rn = 1
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Add this join to the sql query for more metrics on completed queries
|
# Add this join to the sql query for more metrics on completed queries
|
||||||
|
|||||||
@ -56,45 +56,7 @@ def mock_stl_insert_table_cursor(cursor: MagicMock) -> None:
|
|||||||
|
|
||||||
query_vs_cursor_mocker = {
|
query_vs_cursor_mocker = {
|
||||||
(
|
(
|
||||||
"-- DataHub Redshift Source temp table DDL query\n select\n *\n "
|
"\\\n-- DataHub Redshift Source temp table DDL query\nselect\n *\nfrom (\n select\n session_id,\n transaction_id,\n start_time,\n userid,\n REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\\\\\n','\\\\n'), '(CREATE(?:[\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)[^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n query_text,\n row_number() over (\n partition by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n from (\n select\n pid as session_id,\n xid as transaction_id,\n starttime as start_time,\n type,\n query_text,\n userid\n from (\n select\n starttime,\n pid,\n xid,\n type,\n userid,\n LISTAGG(case\n when LEN(RTRIM(text)) = 0 then text\n else RTRIM(text)\n end,\n '') within group (\n order by sequence\n ) as query_text\n from\n SVL_STATEMENTTEXT\n where\n type in ('DDL', 'QUERY')\n AND starttime >= '2024-01-01 12:00:00'\n AND starttime < '2024-01-10 12:00:00'\n AND sequence < 290\n group by\n starttime,\n pid,\n xid,\n type,\n userid\n order by\n starttime,\n pid,\n xid,\n type,\n userid\n asc\n )\n where\n type in ('DDL', 'QUERY')\n )\n where\n (create_command ilike 'create temp table %'\n or create_command ilike 'create temporary table %'\n -- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction\n or create_command ilike 'create table %')\n -- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.\n and query_text not ilike 'CREATE TEMP TABLE volt_tt_%'\n and create_command not like 'CREATE TEMP TABLE volt_tt_'\n -- We need to filter out our query and it was not possible earlier when we did not have any comment in the query\n and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%'\n\n)\nwhere\n rn = 1\n "
|
||||||
"from\n (\n select\n session_id,\n "
|
|
||||||
" transaction_id,\n start_time,\n userid,\n "
|
|
||||||
" REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\\\\\n','\\\\n'), '(CREATE(?:["
|
|
||||||
"\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)["
|
|
||||||
"^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n "
|
|
||||||
" query_text,\n row_number() over (\n partition "
|
|
||||||
"by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n "
|
|
||||||
" from\n (\n select\n pid "
|
|
||||||
"as session_id,\n xid as transaction_id,\n starttime "
|
|
||||||
"as start_time,\n type,\n query_text,\n "
|
|
||||||
" userid\n from\n (\n "
|
|
||||||
"select\n starttime,\n pid,\n "
|
|
||||||
" xid,\n type,\n userid,\n "
|
|
||||||
" LISTAGG(case\n when LEN(RTRIM(text)) = 0 then text\n "
|
|
||||||
" else RTRIM(text)\n end,\n "
|
|
||||||
" '') within group (\n order by sequence\n "
|
|
||||||
" ) as query_text\n from\n "
|
|
||||||
"SVL_STATEMENTTEXT\n where\n type in ('DDL', "
|
|
||||||
"'QUERY')\n AND starttime >= '2024-01-01 12:00:00'\n "
|
|
||||||
" AND starttime < '2024-01-10 12:00:00'\n -- See "
|
|
||||||
"https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl"
|
|
||||||
"-statementtext\n AND sequence < 320\n group by\n "
|
|
||||||
" starttime,\n pid,\n "
|
|
||||||
"xid,\n type,\n userid\n "
|
|
||||||
" order by\n starttime,\n pid,\n "
|
|
||||||
" xid,\n type,\n userid\n "
|
|
||||||
" asc)\n where\n type in ('DDL', "
|
|
||||||
"'QUERY')\n )\n where\n (create_command ilike "
|
|
||||||
"'create temp table %'\n or create_command ilike 'create temporary table %'\n "
|
|
||||||
" -- we want to get all the create table statements and not just temp tables "
|
|
||||||
"if non temp table is created and dropped in the same transaction\n or "
|
|
||||||
"create_command ilike 'create table %')\n -- Redshift creates temp tables with "
|
|
||||||
"the following names: volt_tt_%. We need to filter them out.\n and query_text not "
|
|
||||||
"ilike 'CREATE TEMP TABLE volt_tt_%'\n and create_command not like 'CREATE TEMP "
|
|
||||||
"TABLE volt_tt_'\n -- We need to filter out our query and it was not possible "
|
|
||||||
"earlier when we did not have any comment in the query\n and query_text not ilike "
|
|
||||||
"'%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl"
|
|
||||||
"-statementtext%'\n\n )\n where\n rn = 1\n "
|
|
||||||
): mock_temp_table_cursor,
|
): mock_temp_table_cursor,
|
||||||
"select * from test_collapse_temp_lineage": mock_stl_insert_table_cursor,
|
"select * from test_collapse_temp_lineage": mock_stl_insert_table_cursor,
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user