diff --git a/ingestion/src/metadata/ingestion/models/table_queries.py b/ingestion/src/metadata/ingestion/models/table_queries.py index 17e2cd03b95..c37bbc7902e 100644 --- a/ingestion/src/metadata/ingestion/models/table_queries.py +++ b/ingestion/src/metadata/ingestion/models/table_queries.py @@ -27,28 +27,20 @@ class TableQuery(JsonSerializable): def __init__( self, query: str, - label: str, - userid: int, - xid: int, - pid: int, + user_name: str, starttime: str, endtime: str, analysis_date: str, - duration: int, database: str, aborted: bool, sql: str, ) -> None: """ """ self.query = query - self.label = label - self.userid = userid - self.xid = xid - self.pid = pid + self.user_name = user_name self.starttime = starttime self.endtime = endtime self.analysis_date = analysis_date - self.duration = duration self.database = database self.aborted = aborted self.sql = sql diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index e74aedfbb17..c08819cd6d1 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -15,6 +15,7 @@ import datetime import logging +import traceback from typing import Optional from sql_metadata import Parser diff --git a/ingestion/src/metadata/ingestion/source/redshift_usage.py b/ingestion/src/metadata/ingestion/source/redshift_usage.py index 38458fe139d..98e6095f47b 100644 --- a/ingestion/src/metadata/ingestion/source/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/redshift_usage.py @@ -33,29 +33,26 @@ logger = logging.getLogger(__name__) class RedshiftUsageSource(Source): # SELECT statement from mysql information_schema to extract table and column metadata SQL_STATEMENT = """ - WITH query_sql AS ( - SELECT - query, - LISTAGG(text) WITHIN GROUP (ORDER BY sequence) AS sql - FROM stl_querytext - GROUP BY 1 - ) - - SELECT - q.query, q.label, userid, xid, pid, starttime, endtime, - DATEDIFF(milliseconds, starttime, endtime) AS duration, - TRIM(database) AS database, - '{start_date}' as analysis_date, - (CASE aborted WHEN 1 THEN TRUE ELSE FALSE END) AS aborted, - sql - FROM - stl_query q JOIN query_sql qs ON (q.query = qs.query) - WHERE - endtime between '{start_date}' and '{end_date}' - {where_clause} - ORDER BY starttime; - """ - + SELECT DISTINCT ss.userid, + ss.query, + sui.usename, + ss.tbl, + sq.querytxt, + sti.database, + sti.schema, + sti.table, + sq.starttime, + sq.endtime, + sq.aborted + FROM stl_scan ss + JOIN svv_table_info sti ON ss.tbl = sti.table_id + JOIN stl_query sq ON ss.query = sq.query + JOIN svl_user_info sui ON sq.userid = sui.usesysid + WHERE ss.starttime >= '{start_time}' + AND ss.starttime < '{end_time}' + AND sq.aborted = 0 + ORDER BY ss.endtime DESC; + """ # CONFIG KEYS WHERE_CLAUSE_SUFFIX_KEY = "where_clause" CLUSTER_SOURCE = "cluster_source" @@ -69,8 +66,9 @@ class RedshiftUsageSource(Source): super().__init__(ctx) start, end = get_start_and_end(config.duration) self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( - where_clause=config.where_clause, start_date=start, end_date=end + start_time=start, end_time=end ) + self.analysis_date = start self.alchemy_helper = SQLAlchemyHelper( config, metadata_config, ctx, "Redshift", self.sql_stmt ) @@ -103,18 +101,14 @@ class RedshiftUsageSource(Source): """ for row in self._get_raw_extract_iter(): tq = TableQuery( - row["query"], - row["label"], - row["userid"], - row["xid"], - row["pid"], - str(row["starttime"]), - str(row["endtime"]), - str(row["analysis_date"]), - row["duration"], - row["database"], - row["aborted"], - row["sql"], + query=row["query"], + user_name=row["usename"], + starttime=str(row["starttime"]), + endtime=str(row["endtime"]), + analysis_date=str(self.analysis_date), + database=row["database"], + aborted=row["aborted"], + sql=row["querytxt"], ) yield tq