mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	Improve Redshift Usage query (#881)
* Improve Redshift Usage query * Improve Redshift Usage query * Improve Redshift Usage query
This commit is contained in:
		
							parent
							
								
									39a7b3e8c6
								
							
						
					
					
						commit
						29c87f77d0
					
				@ -27,28 +27,20 @@ class TableQuery(JsonSerializable):
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        query: str,
 | 
			
		||||
        label: str,
 | 
			
		||||
        userid: int,
 | 
			
		||||
        xid: int,
 | 
			
		||||
        pid: int,
 | 
			
		||||
        user_name: str,
 | 
			
		||||
        starttime: str,
 | 
			
		||||
        endtime: str,
 | 
			
		||||
        analysis_date: str,
 | 
			
		||||
        duration: int,
 | 
			
		||||
        database: str,
 | 
			
		||||
        aborted: bool,
 | 
			
		||||
        sql: str,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        """ """
 | 
			
		||||
        self.query = query
 | 
			
		||||
        self.label = label
 | 
			
		||||
        self.userid = userid
 | 
			
		||||
        self.xid = xid
 | 
			
		||||
        self.pid = pid
 | 
			
		||||
        self.user_name = user_name
 | 
			
		||||
        self.starttime = starttime
 | 
			
		||||
        self.endtime = endtime
 | 
			
		||||
        self.analysis_date = analysis_date
 | 
			
		||||
        self.duration = duration
 | 
			
		||||
        self.database = database
 | 
			
		||||
        self.aborted = aborted
 | 
			
		||||
        self.sql = sql
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 | 
			
		||||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
import traceback
 | 
			
		||||
from typing import Optional
 | 
			
		||||
 | 
			
		||||
from sql_metadata import Parser
 | 
			
		||||
 | 
			
		||||
@ -33,29 +33,26 @@ logger = logging.getLogger(__name__)
 | 
			
		||||
class RedshiftUsageSource(Source):
 | 
			
		||||
    # SELECT statement from mysql information_schema to extract table and column metadata
 | 
			
		||||
    SQL_STATEMENT = """
 | 
			
		||||
        WITH query_sql AS (
 | 
			
		||||
                 SELECT
 | 
			
		||||
                    query,
 | 
			
		||||
                    LISTAGG(text) WITHIN GROUP (ORDER BY sequence) AS sql
 | 
			
		||||
                FROM stl_querytext 
 | 
			
		||||
                GROUP BY 1
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        SELECT
 | 
			
		||||
            q.query,  q.label, userid,  xid,  pid,  starttime,  endtime,
 | 
			
		||||
            DATEDIFF(milliseconds, starttime, endtime) AS duration,
 | 
			
		||||
            TRIM(database) AS database,
 | 
			
		||||
            '{start_date}' as analysis_date,
 | 
			
		||||
            (CASE aborted WHEN 1 THEN TRUE ELSE FALSE END) AS aborted,
 | 
			
		||||
        sql
 | 
			
		||||
        FROM
 | 
			
		||||
            stl_query q JOIN query_sql qs ON (q.query = qs.query)
 | 
			
		||||
        WHERE
 | 
			
		||||
        endtime between '{start_date}' and '{end_date}'
 | 
			
		||||
        {where_clause}
 | 
			
		||||
        ORDER BY starttime;
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        SELECT DISTINCT ss.userid,
 | 
			
		||||
            ss.query,
 | 
			
		||||
            sui.usename,
 | 
			
		||||
            ss.tbl,
 | 
			
		||||
            sq.querytxt,
 | 
			
		||||
            sti.database,
 | 
			
		||||
            sti.schema,
 | 
			
		||||
            sti.table,
 | 
			
		||||
            sq.starttime,
 | 
			
		||||
            sq.endtime,
 | 
			
		||||
            sq.aborted
 | 
			
		||||
        FROM stl_scan ss
 | 
			
		||||
            JOIN svv_table_info sti ON ss.tbl = sti.table_id
 | 
			
		||||
            JOIN stl_query sq ON ss.query = sq.query
 | 
			
		||||
            JOIN svl_user_info sui ON sq.userid = sui.usesysid
 | 
			
		||||
        WHERE ss.starttime >= '{start_time}'
 | 
			
		||||
            AND ss.starttime < '{end_time}'
 | 
			
		||||
            AND sq.aborted = 0
 | 
			
		||||
        ORDER BY ss.endtime DESC;
 | 
			
		||||
    """
 | 
			
		||||
    # CONFIG KEYS
 | 
			
		||||
    WHERE_CLAUSE_SUFFIX_KEY = "where_clause"
 | 
			
		||||
    CLUSTER_SOURCE = "cluster_source"
 | 
			
		||||
@ -69,8 +66,9 @@ class RedshiftUsageSource(Source):
 | 
			
		||||
        super().__init__(ctx)
 | 
			
		||||
        start, end = get_start_and_end(config.duration)
 | 
			
		||||
        self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format(
 | 
			
		||||
            where_clause=config.where_clause, start_date=start, end_date=end
 | 
			
		||||
            start_time=start, end_time=end
 | 
			
		||||
        )
 | 
			
		||||
        self.analysis_date = start
 | 
			
		||||
        self.alchemy_helper = SQLAlchemyHelper(
 | 
			
		||||
            config, metadata_config, ctx, "Redshift", self.sql_stmt
 | 
			
		||||
        )
 | 
			
		||||
@ -103,18 +101,14 @@ class RedshiftUsageSource(Source):
 | 
			
		||||
        """
 | 
			
		||||
        for row in self._get_raw_extract_iter():
 | 
			
		||||
            tq = TableQuery(
 | 
			
		||||
                row["query"],
 | 
			
		||||
                row["label"],
 | 
			
		||||
                row["userid"],
 | 
			
		||||
                row["xid"],
 | 
			
		||||
                row["pid"],
 | 
			
		||||
                str(row["starttime"]),
 | 
			
		||||
                str(row["endtime"]),
 | 
			
		||||
                str(row["analysis_date"]),
 | 
			
		||||
                row["duration"],
 | 
			
		||||
                row["database"],
 | 
			
		||||
                row["aborted"],
 | 
			
		||||
                row["sql"],
 | 
			
		||||
                query=row["query"],
 | 
			
		||||
                user_name=row["usename"],
 | 
			
		||||
                starttime=str(row["starttime"]),
 | 
			
		||||
                endtime=str(row["endtime"]),
 | 
			
		||||
                analysis_date=str(self.analysis_date),
 | 
			
		||||
                database=row["database"],
 | 
			
		||||
                aborted=row["aborted"],
 | 
			
		||||
                sql=row["querytxt"],
 | 
			
		||||
            )
 | 
			
		||||
            yield tq
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user