diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 39370b93b5..f7fad574f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -4,6 +4,12 @@ from typing import List redshift_datetime_format = "%Y-%m-%d %H:%M:%S" +# See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext +# for why we need to limit the size of the query text. +# We use 290 instead instead of the standard 320, because escape characters can add to the length. +_QUERY_SEQUENCE_LIMIT = 290 + + class RedshiftCommonQuery: CREATE_TEMP_TABLE_CLAUSE = "create temp table" CREATE_TEMPORARY_TABLE_CLAUSE = "create temporary table" @@ -487,71 +493,70 @@ class RedshiftProvisionedQuery(RedshiftCommonQuery): def list_insert_create_queries_sql( db_name: str, start_time: datetime, end_time: datetime ) -> str: - return """ - with query_txt as - ( - select - query, - pid, - LISTAGG(case - when LEN(RTRIM(text)) = 0 then text - else RTRIM(text) - end) within group ( - order by - sequence) as ddl - from - ( - select - query, - pid, - text, - sequence - from - STL_QUERYTEXT - where - sequence < 320 - order by - sequence - ) - group by - query, - pid - ) - select - distinct tbl as target_table_id, - sti.schema as target_schema, - sti.table as target_table, - sti.database as cluster, - usename as username, - ddl, - sq.query as query_id, - min(si.starttime) as timestamp, - ANY_VALUE(pid) as session_id - from - stl_insert as si - left join SVV_TABLE_INFO sti on - sti.table_id = tbl - left join svl_user_info sui on - si.userid = sui.usesysid - left join query_txt sq on - si.query = sq.query - left join stl_load_commits slc on - slc.query = si.query - where - sui.usename <> 'rdsdb' - and cluster = '{db_name}' - and slc.query IS NULL - and si.starttime >= '{start_time}' - and si.starttime < '{end_time}' - group by - target_table_id, - target_schema, - target_table, - cluster, - username, - ddl, - sq.query + return """\ +with query_txt as ( + select + query, + pid, + LISTAGG(case + when LEN(RTRIM(text)) = 0 then text + else RTRIM(text) + end) within group ( + order by sequence + ) as ddl + from ( + select + query, + pid, + text, + sequence + from + STL_QUERYTEXT + where + sequence < {_QUERY_SEQUENCE_LIMIT} + order by + sequence + ) + group by + query, + pid +) +select + distinct tbl as target_table_id, + sti.schema as target_schema, + sti.table as target_table, + sti.database as cluster, + usename as username, + ddl, + sq.query as query_id, + min(si.starttime) as timestamp, + ANY_VALUE(pid) as session_id +from + stl_insert as si +left join SVV_TABLE_INFO sti on + sti.table_id = tbl +left join svl_user_info sui on + si.userid = sui.usesysid +left join query_txt sq on + si.query = sq.query +left join stl_load_commits slc on + slc.query = si.query +where + sui.usename <> 'rdsdb' + and cluster = '{db_name}' + and slc.query IS NULL + and si.starttime >= '{start_time}' + and si.starttime < '{end_time}' +group by + target_table_id, + target_schema, + target_table, + cluster, + username, + ddl, + sq.query """.format( + _QUERY_SEQUENCE_LIMIT=_QUERY_SEQUENCE_LIMIT, # We need the original database name for filtering db_name=db_name, start_time=start_time.strftime(redshift_datetime_format), @@ -564,84 +569,82 @@ class RedshiftProvisionedQuery(RedshiftCommonQuery): end_time_str: str = end_time.strftime(redshift_datetime_format) - return rf"""-- DataHub Redshift Source temp table DDL query + return rf"""\ +-- DataHub Redshift Source temp table DDL query +select + * +from ( + select + session_id, + transaction_id, + start_time, + userid, + REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') as create_command, + query_text, + row_number() over ( + partition by session_id, TRIM(query_text) + order by start_time desc + ) rn + from ( + select + pid as session_id, + xid as transaction_id, + starttime as start_time, + type, + query_text, + userid + from ( select - * + starttime, + pid, + xid, + type, + userid, + LISTAGG(case + when LEN(RTRIM(text)) = 0 then text + else RTRIM(text) + end, + '') within group ( + order by sequence + ) as query_text from - ( - select - session_id, - transaction_id, - start_time, - userid, - REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') as create_command, - query_text, - row_number() over ( - partition by session_id, TRIM(query_text) - order by start_time desc - ) rn - from - ( - select - pid as session_id, - xid as transaction_id, - starttime as start_time, - type, - query_text, - userid - from - ( - select - starttime, - pid, - xid, - type, - userid, - LISTAGG(case - when LEN(RTRIM(text)) = 0 then text - else RTRIM(text) - end, - '') within group ( - order by sequence - ) as query_text - from - SVL_STATEMENTTEXT - where - type in ('DDL', 'QUERY') - AND starttime >= '{start_time_str}' - AND starttime < '{end_time_str}' - -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext - AND sequence < 320 - group by - starttime, - pid, - xid, - type, - userid - order by - starttime, - pid, - xid, - type, - userid - asc) - where - type in ('DDL', 'QUERY') - ) - where - (create_command ilike 'create temp table %' - or create_command ilike 'create temporary table %' - -- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction - or create_command ilike 'create table %') - -- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out. - and query_text not ilike 'CREATE TEMP TABLE volt_tt_%' - and create_command not like 'CREATE TEMP TABLE volt_tt_' - -- We need to filter out our query and it was not possible earlier when we did not have any comment in the query - and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%' - - ) + SVL_STATEMENTTEXT where - rn = 1 + type in ('DDL', 'QUERY') + AND starttime >= '{start_time_str}' + AND starttime < '{end_time_str}' + AND sequence < {_QUERY_SEQUENCE_LIMIT} + group by + starttime, + pid, + xid, + type, + userid + order by + starttime, + pid, + xid, + type, + userid + asc + ) + where + type in ('DDL', 'QUERY') + ) + where + (create_command ilike 'create temp table %' + or create_command ilike 'create temporary table %' + -- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction + or create_command ilike 'create table %') + -- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out. + and query_text not ilike 'CREATE TEMP TABLE volt_tt_%' + and create_command not like 'CREATE TEMP TABLE volt_tt_' + -- We need to filter out our query and it was not possible earlier when we did not have any comment in the query + and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%' + +) +where + rn = 1 """ # Add this join to the sql query for more metrics on completed queries diff --git a/metadata-ingestion/tests/unit/redshift/redshift_query_mocker.py b/metadata-ingestion/tests/unit/redshift/redshift_query_mocker.py index ada76e6240..06b592d429 100644 --- a/metadata-ingestion/tests/unit/redshift/redshift_query_mocker.py +++ b/metadata-ingestion/tests/unit/redshift/redshift_query_mocker.py @@ -56,45 +56,7 @@ def mock_stl_insert_table_cursor(cursor: MagicMock) -> None: query_vs_cursor_mocker = { ( - "-- DataHub Redshift Source temp table DDL query\n select\n *\n " - "from\n (\n select\n session_id,\n " - " transaction_id,\n start_time,\n userid,\n " - " REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\\\\\n','\\\\n'), '(CREATE(?:[" - "\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)[" - "^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n " - " query_text,\n row_number() over (\n partition " - "by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n " - " from\n (\n select\n pid " - "as session_id,\n xid as transaction_id,\n starttime " - "as start_time,\n type,\n query_text,\n " - " userid\n from\n (\n " - "select\n starttime,\n pid,\n " - " xid,\n type,\n userid,\n " - " LISTAGG(case\n when LEN(RTRIM(text)) = 0 then text\n " - " else RTRIM(text)\n end,\n " - " '') within group (\n order by sequence\n " - " ) as query_text\n from\n " - "SVL_STATEMENTTEXT\n where\n type in ('DDL', " - "'QUERY')\n AND starttime >= '2024-01-01 12:00:00'\n " - " AND starttime < '2024-01-10 12:00:00'\n -- See " - "https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl" - "-statementtext\n AND sequence < 320\n group by\n " - " starttime,\n pid,\n " - "xid,\n type,\n userid\n " - " order by\n starttime,\n pid,\n " - " xid,\n type,\n userid\n " - " asc)\n where\n type in ('DDL', " - "'QUERY')\n )\n where\n (create_command ilike " - "'create temp table %'\n or create_command ilike 'create temporary table %'\n " - " -- we want to get all the create table statements and not just temp tables " - "if non temp table is created and dropped in the same transaction\n or " - "create_command ilike 'create table %')\n -- Redshift creates temp tables with " - "the following names: volt_tt_%. We need to filter them out.\n and query_text not " - "ilike 'CREATE TEMP TABLE volt_tt_%'\n and create_command not like 'CREATE TEMP " - "TABLE volt_tt_'\n -- We need to filter out our query and it was not possible " - "earlier when we did not have any comment in the query\n and query_text not ilike " - "'%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl" - "-statementtext%'\n\n )\n where\n rn = 1\n " + "\\\n-- DataHub Redshift Source temp table DDL query\nselect\n *\nfrom (\n select\n session_id,\n transaction_id,\n start_time,\n userid,\n REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\\\\\n','\\\\n'), '(CREATE(?:[\\\\n\\\\s\\\\t]+(?:temp|temporary))?(?:[\\\\n\\\\s\\\\t]+)table(?:[\\\\n\\\\s\\\\t]+)[^\\\\n\\\\s\\\\t()-]+)', 0, 1, 'ipe'),'[\\\\n\\\\s\\\\t]+',' ',1,'p') as create_command,\n query_text,\n row_number() over (\n partition by session_id, TRIM(query_text)\n order by start_time desc\n ) rn\n from (\n select\n pid as session_id,\n xid as transaction_id,\n starttime as start_time,\n type,\n query_text,\n userid\n from (\n select\n starttime,\n pid,\n xid,\n type,\n userid,\n LISTAGG(case\n when LEN(RTRIM(text)) = 0 then text\n else RTRIM(text)\n end,\n '') within group (\n order by sequence\n ) as query_text\n from\n SVL_STATEMENTTEXT\n where\n type in ('DDL', 'QUERY')\n AND starttime >= '2024-01-01 12:00:00'\n AND starttime < '2024-01-10 12:00:00'\n AND sequence < 290\n group by\n starttime,\n pid,\n xid,\n type,\n userid\n order by\n starttime,\n pid,\n xid,\n type,\n userid\n asc\n )\n where\n type in ('DDL', 'QUERY')\n )\n where\n (create_command ilike 'create temp table %'\n or create_command ilike 'create temporary table %'\n -- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction\n or create_command ilike 'create table %')\n -- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.\n and query_text not ilike 'CREATE TEMP TABLE volt_tt_%'\n and create_command not like 'CREATE TEMP TABLE volt_tt_'\n -- We need to filter out our query and it was not possible earlier when we did not have any comment in the query\n and query_text not ilike '%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext%'\n\n)\nwhere\n rn = 1\n " ): mock_temp_table_cursor, "select * from test_collapse_temp_lineage": mock_stl_insert_table_cursor, }