diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py index 247545622b7..f098bcf01e8 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py @@ -25,6 +25,7 @@ from metadata.generated.schema.api.data.createTableProfile import ( from metadata.generated.schema.entity.data.table import ( ColumnProfile, DataModel, + SystemProfile, Table, TableData, TableJoins, @@ -252,13 +253,19 @@ class OMetaTableMixin: url_after = f"&after={after}" if after else "" profile_type_url = profile_type.__name__[0].lower() + profile_type.__name__[1:] + + # system profile uses milliseconds + if profile_type is not SystemProfile: + start_ts = start_ts // 1000 + end_ts = end_ts // 1000 + resp = self.client.get( f"{self.get_suffix(Table)}/{fqn}/{profile_type_url}?limit={limit}{url_after}", - data={"startTs": start_ts // 1000, "endTs": end_ts // 1000}, + data={"startTs": start_ts, "endTs": end_ts}, ) - if profile_type is TableProfile: - data: List[T] = [TableProfile(**datum) for datum in resp["data"]] # type: ignore + if profile_type in (TableProfile, SystemProfile): + data: List[T] = [profile_type(**datum) for datum in resp["data"]] # type: ignore elif profile_type is ColumnProfile: split_fqn = fqn.split(".") if len(split_fqn) < 5: diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index 795e24440ea..f3beef856df 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -14,7 +14,7 @@ System Metric """ import traceback -from collections import namedtuple +from collections import defaultdict, namedtuple from enum import Enum from textwrap import dedent from typing import Dict, List, Optional @@ -22,7 +22,6 @@ from typing import Dict, List, Optional import sqlparse from sqlalchemy import text from sqlalchemy.orm import DeclarativeMeta, Session -from sqlparse.sql import Identifier from metadata.generated.schema.entity.data.table import DmlOperationType from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( @@ -31,10 +30,19 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.profiler.metrics.core import SystemMetric from metadata.profiler.orm.registry import Dialects from metadata.utils.dispatch import valuedispatch +from metadata.utils.helpers import deep_size_of_dict from metadata.utils.logger import profiler_logger +from metadata.utils.profiler_utils import clean_up_query, get_snowflake_system_queries logger = profiler_logger() +MAX_SIZE_IN_BYTES = 2 * 1024**3 # 2GB + + +def recursive_dic(): + """recursive default dict""" + return defaultdict(recursive_dic) + class DatabaseDMLOperations(Enum): """enum of supported DML operation on database engine side""" @@ -52,7 +60,7 @@ DML_OPERATION_MAP = { DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value, } -SYSTEM_QUERY_RESULT_CACHE = {} +SYSTEM_QUERY_RESULT_CACHE = recursive_dic() @valuedispatch @@ -105,6 +113,8 @@ def _( "deleted_row_count": DatabaseDMLOperations.DELETE.value, "updated_row_count": DatabaseDMLOperations.UPDATE.value, } + project_id = session.get_bind().url.host + dataset_id = table.__table_args__["schema"] jobs = dedent( f""" @@ -117,6 +127,8 @@ def _( `region-{conn_config.usageLocation}`.INFORMATION_SCHEMA.JOBS WHERE DATE(creation_time) >= CURRENT_DATE() - 1 AND + destination_table.dataset_id = '{dataset_id}' AND + destination_table.project_id = '{project_id}' AND statement_type IN ( '{DatabaseDMLOperations.INSERT.value}', '{DatabaseDMLOperations.DELETE.value}', @@ -133,12 +145,17 @@ def _( "query_type,timestamp,destination_table,dml_statistics", ) - try: + if ( + "query_results" + in SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id] + ): # we'll try to get the cached data first - rows_jobs = kwargs["cache"][Dialects.BigQuery]["rows_jobs"] - except KeyError: + query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][ + dataset_id + ]["query_results"] + else: cursor_jobs = session.execute(text(jobs)) - rows_jobs = [ + query_results = [ QueryResult( row.statement_type, row.start_time, @@ -147,15 +164,12 @@ def _( ) for row in cursor_jobs.fetchall() ] - SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery] = {"rows_jobs": rows_jobs} + SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id][ + "query_results" + ] = query_results - for row_jobs in rows_jobs: - if ( - row_jobs.destination_table.get("project_id") == session.get_bind().url.host - and row_jobs.destination_table.get("dataset_id") - == table.__table_args__["schema"] - and row_jobs.destination_table.get("table_id") == table.__tablename__ - ): + for row_jobs in query_results: + if row_jobs.destination_table.get("table_id") == table.__tablename__: rows_affected = None try: if row_jobs.query_type == DatabaseDMLOperations.INSERT.value: @@ -169,7 +183,7 @@ def _( rows_affected = None if row_jobs.query_type == DatabaseDMLOperations.MERGE.value: - for i, key in enumerate(row_jobs.dml_statistics): + for indx, key in enumerate(row_jobs.dml_statistics): if row_jobs.dml_statistics[key] != 0: metric_results.append( { @@ -177,7 +191,7 @@ def _( # We are padding timestamps by 0,1,2 millisesond to avoid # duplicate timestamps "timestamp": int(row_jobs.timestamp.timestamp() * 1000) - + i, + + indx, "operation": dml_stat_to_dml_statement_mapping.get(key), "rowsAffected": row_jobs.dml_statistics[key], } @@ -214,6 +228,8 @@ def _( List[Dict]: """ logger.debug(f"Fetching system metrics for {dialect}") + database = session.get_bind().url.database + schema = table.__table_args__["schema"] stl_deleted = dedent( f""" @@ -229,9 +245,8 @@ def _( INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query WHERE - sti."database" = '{session.get_bind().url.database}' AND - sti."schema" = '{table.__table_args__["schema"]}' AND - sti."table" = '{table.__tablename__}' AND + sti."database" = '{database}' AND + sti."schema" = '{schema}' AND "rows" != 0 AND DATE(starttime) >= CURRENT_DATE - 1 GROUP BY 2,3,4,5,6 @@ -253,9 +268,8 @@ def _( INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query WHERE - sti."database" = '{session.get_bind().url.database}' AND - sti."schema" = '{table.__table_args__["schema"]}' AND - sti."table" = '{table.__tablename__}' AND + sti."database" = '{database}' AND + sti."schema" = '{schema}' AND "rows" != 0 AND DATE(starttime) >= CURRENT_DATE - 1 GROUP BY 2,3,4,5,6 @@ -269,72 +283,98 @@ def _( "database_name,schema_name,table_name,query_text,timestamp,rowsAffected", ) - cursor_insert = session.execute(text(stl_insert)) - rows_insert = [ - QueryResult( - row.database, - row.schema, - row.table, - sqlparse.parse(row.text)[0], - row.starttime, - row.rows, - ) - for row in cursor_insert.fetchall() - ] + if ( + "query_results_inserted" + in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema] + ): + # we'll try to get the cached data first + query_results_inserted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][ + schema + ]["query_results_inserted"] + else: + cursor_insert = session.execute(text(stl_insert)) + query_results_inserted = [ + QueryResult( + row.database, + row.schema, + row.table, + sqlparse.parse(clean_up_query(row.text))[0], + row.starttime, + row.rows, + ) + for row in cursor_insert.fetchall() + ] + SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][ + "query_results_inserted" + ] = query_results_inserted - cursor_deleted = session.execute(text(stl_deleted)) - rows_deleted = [ - QueryResult( - row.database, - row.schema, - row.table, - sqlparse.parse(row.text)[0], - row.starttime, - row.rows, - ) - for row in cursor_deleted.fetchall() - ] + if ( + "query_results_deleted" + in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema] + ): + # we'll try to get the cached data first + query_results_deleted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][ + schema + ]["query_results_deleted"] + else: + cursor_deleted = session.execute(text(stl_deleted)) + query_results_deleted = [ + QueryResult( + row.database, + row.schema, + row.table, + sqlparse.parse(clean_up_query(row.text))[0], + row.starttime, + row.rows, + ) + for row in cursor_deleted.fetchall() + ] + SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][ + "query_results_deleted" + ] = query_results_deleted - for row_insert in rows_insert: - query_text = row_insert.query_text - operation = next( - ( - token.value.upper() - for token in query_text.tokens - if token.ttype is sqlparse.tokens.DML - and token.value.upper() - in DmlOperationType._member_names_ # pylint: disable=protected-access - ), - None, - ) - if operation: - metric_results.append( - { - "timestamp": int(row_insert.timestamp.timestamp() * 1000), - "operation": operation, - "rowsAffected": row_insert.rowsAffected, - } + for row_inserted in query_results_inserted: + if row_inserted.table_name == table.__tablename__: + query_text = row_inserted.query_text + operation = next( + ( + token.value.upper() + for token in query_text.tokens + if token.ttype is sqlparse.tokens.DML + and token.value.upper() + in DmlOperationType._member_names_ # pylint: disable=protected-access + ), + None, + ) + if operation: + metric_results.append( + { + "timestamp": int(row_inserted.timestamp.timestamp() * 1000), + "operation": operation, + "rowsAffected": row_inserted.rowsAffected, + } + ) + + for row_deleted in query_results_deleted: + if row_deleted.table_name == table.__tablename__: + query_text = row_deleted.query_text + operation = next( + ( + token.value.upper() + for token in query_text.tokens + if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE" + ), + None, ) - for row_deleted in rows_deleted: - query_text = row_deleted.query_text - operation = next( - ( - token.value.upper() - for token in query_text.tokens - if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE" - ), - None, - ) - - if operation: - metric_results.append( - { - "timestamp": int(row_deleted.timestamp.timestamp() * 1000), - "operation": operation, - "rowsAffected": row_deleted.rowsAffected, - } - ) + if operation: + metric_results.append( + { + "timestamp": int(row_deleted.timestamp.timestamp() * 1000), + "operation": operation, + "rowsAffected": row_deleted.rowsAffected, + } + ) return metric_results @@ -361,6 +401,8 @@ def _( Dict: system metric """ logger.debug(f"Fetching system metrics for {dialect}") + database = session.get_bind().url.database + schema = table.__table_args__["schema"] metric_results: List[Dict] = [] @@ -381,63 +423,27 @@ def _( FROM TABLE(RESULT_SCAN('{query_id}')); """ - QueryResult = namedtuple( - "QueryResult", - "query_id,database_name,schema_name,query_text,query_type,timestamp", - ) - - try: + if ( + "query_results" + in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema] + ): # we'll try to get the cached data first - rows = kwargs["cache"][Dialects.Snowflake]["rows"] - query_results = kwargs["cache"][Dialects.Snowflake]["query_results"] - except KeyError: - rows = session.execute(text(information_schema_query_history)).fetchall() - query_results = [ - QueryResult( - row.query_id, - row.database_name.lower() if row.database_name else None, - row.schema_name.lower() if row.schema_name else None, - sqlparse.parse(row.query_text)[0], - row.query_type, - row.start_time, - ) - for row in rows + query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ + "query_results" ] - SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake] = { - "rows": rows, - "query_results": query_results, - } + else: + rows = session.execute(text(information_schema_query_history)) + query_results = [] + for row in rows: + result = get_snowflake_system_queries(row, database, schema) + if result: + query_results.append(result) + SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ + "query_results" + ] = query_results for query_result in query_results: - query_text = query_result.query_text - identifier = next( - ( - query_el - for query_el in query_text.tokens - if isinstance(query_el, Identifier) - ), - None, - ) - if not identifier: - continue - - values = identifier.value.split(".") - database_name, schema_name, table_name = ([None] * (3 - len(values))) + values - - database_name = ( - database_name.lower().strip('"') - if database_name - else query_result.database_name - ) - schema_name = ( - schema_name.lower().strip('"') if schema_name else query_result.schema_name - ) - - if ( - session.get_bind().url.database.lower() == database_name - and table.__table_args__["schema"].lower() == schema_name - and table.__tablename__ == table_name - ): + if table.__tablename__.lower() == query_result.table_name: cursor_for_result_scan = session.execute( text(dedent(result_scan.format(query_id=query_result.query_id))) ) @@ -483,6 +489,18 @@ class System(SystemMetric): def name(cls): return "system" + def _manage_cache(self, max_size_in_bytes: int = MAX_SIZE_IN_BYTES) -> None: + """manage cache and clears it if it exceeds the max size + + Args: + max_size_in_bytes (int, optional): max size of cache in bytes. Defaults to 2147483648. + Returns: + None + """ + if deep_size_of_dict(SYSTEM_QUERY_RESULT_CACHE) > max_size_in_bytes: + logger.debug("Clearing system cache") + SYSTEM_QUERY_RESULT_CACHE.clear() + def sql(self, session: Session, **kwargs): """Implements the SQL logic to fetch system data""" if not hasattr(self, "table"): @@ -497,7 +515,6 @@ class System(SystemMetric): session=session, table=self.table, conn_config=conn_config, - cache=SYSTEM_QUERY_RESULT_CACHE, ) - + self._manage_cache() return system_metrics diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index b3c49075f2a..d9268da6fb1 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -15,7 +15,9 @@ Helpers module for ingestion related methods from __future__ import annotations +import itertools import re +import sys from datetime import datetime, timedelta from functools import wraps from math import floor, log @@ -352,3 +354,36 @@ def clean_uri(uri: str) -> str: make it http://localhost:9000 """ return uri[:-1] if uri.endswith("/") else uri + + +def deep_size_of_dict(obj: dict) -> int: + """Get deepsize of dict data structure + + Args: + obj (dict): dict data structure + Returns: + int: size of dict data structure + """ + # pylint: disable=unnecessary-lambda-assignment + dict_handler = lambda elmt: itertools.chain.from_iterable(elmt.items()) + handlers = { + dict: dict_handler, + list: iter, + } + + seen = set() + + def sizeof(obj) -> int: + if id(obj) in seen: + return 0 + + seen.add(id(obj)) + size = sys.getsizeof(obj, 0) + for type_, handler in handlers.items(): + if isinstance(obj, type_): + size += sum(map(sizeof, handler(obj))) + break + + return size + + return sizeof(obj) diff --git a/ingestion/src/metadata/utils/profiler_utils.py b/ingestion/src/metadata/utils/profiler_utils.py index 8920f16ec71..e759fd3d458 100644 --- a/ingestion/src/metadata/utils/profiler_utils.py +++ b/ingestion/src/metadata/utils/profiler_utils.py @@ -11,8 +11,14 @@ """Profiler utils class and functions""" +import re +from collections import namedtuple from typing import Optional +import sqlparse +from sqlalchemy.engine.row import Row +from sqlparse.sql import Identifier + from metadata.utils.sqa_utils import is_array @@ -44,3 +50,67 @@ class ColumnLike: return cls(is_array(kwargs), kwargs.pop("array_col")) except KeyError: return cls(False, None) + + +def clean_up_query(query: str) -> str: + """remove comments and newlines from query""" + return sqlparse.format(query, strip_comments=True).replace("\\n", "") + + +def get_snowflake_system_queries( + row: Row, database: str, schema: str +) -> Optional["QueryResult"]: + """get snowflake system queries for a specific database and schema. Parsing the query + is the only reliable way to get the DDL operation as fields in the table are not. + + Args: + row (dict): row from the snowflake system queries table + database (str): database name + schema (str): schema name + Returns: + QueryResult: namedtuple with the query result + """ + + QueryResult = namedtuple( + "QueryResult", + "query_id,database_name,schema_name,table_name,query_text,query_type,timestamp", + ) + + try: + parsed_query = sqlparse.parse(clean_up_query(row.query_text))[0] + identifier = next( + ( + query_el + for query_el in parsed_query.tokens + if isinstance(query_el, Identifier) + ), + None, + ) + if not identifier: + return None + values = identifier.value.split(".") + database_name, schema_name, table_name = ([None] * (3 - len(values))) + values + + if not all([database_name, schema_name, table_name]): + return None + + # clean up table name + table_name = re.sub(r"\s.*", "", table_name).strip() + + if ( + database.lower() == database_name.lower() + and schema.lower() == schema_name.lower() + ): + return QueryResult( + row.query_id, + database_name.lower(), + schema_name.lower(), + table_name.lower(), + parsed_query, + row.query_type, + row.start_time, + ) + except Exception: + return None + + return None diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 8a2e35c706a..3f94eb51b4d 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -209,6 +209,25 @@ class CliDBBase(TestCase): sink_status, ) + @pytest.mark.order(12) + def test_system_metrics(self) -> None: + if not any([self.delete_queries(), self.update_queries()]): + pytest.skip( + "System metrics test requires delete and update table rows queries" + ) + self.build_config_file() + self.run_command() + self.delete_table_and_view() + self.create_table_and_view() + self.build_config_file( + E2EType.PROFILER, {"includes": self.get_includes_schemas()} + ) + self.delete_table_rows() + self.update_table_row() + result = self.run_command("profile") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_system_metrics(source_status, sink_status) + def retrieve_table(self, table_name_fqn: str) -> Table: return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) @@ -318,6 +337,22 @@ class CliDBBase(TestCase): def get_profiler_time_partition_results() -> Optional[dict]: return None + @staticmethod + def delete_queries() -> Optional[List[str]]: + return None + + @staticmethod + def update_queries() -> Optional[List[str]]: + return None + + @staticmethod + def delete_table_rows() -> None: + return None + + @staticmethod + def update_table_row() -> None: + return None + @staticmethod def get_test_type() -> str: return "database" diff --git a/ingestion/tests/cli_e2e/common/test_cli_db.py b/ingestion/tests/cli_e2e/common/test_cli_db.py index ed3332e544a..6ba1e9eafc6 100644 --- a/ingestion/tests/cli_e2e/common/test_cli_db.py +++ b/ingestion/tests/cli_e2e/common/test_cli_db.py @@ -14,11 +14,13 @@ Test database connectors which extend from `CommonDbSourceService` with CLI """ import json from abc import ABC, abstractmethod +from datetime import datetime, timedelta from pathlib import Path from typing import Optional from sqlalchemy.engine import Engine +from metadata.generated.schema.entity.data.table import SystemProfile from metadata.ingestion.api.sink import SinkStatus from metadata.ingestion.api.source import SourceStatus from metadata.ingestion.api.workflow import Workflow @@ -124,6 +126,27 @@ class CliCommonDB: == table_profile.get("rowCount") ) + def assert_for_system_metrics( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue(len(source_status.failures) == 0) + self.assertTrue(len(sink_status.failures) == 0) + + start_ts = int((datetime.now() - timedelta(days=1)).timestamp() * 1000) + end_ts = int((datetime.now() + timedelta(days=1)).timestamp() * 1000) + system_profile = self.openmetadata.get_profile_data( + self.fqn_deleted_table(), + start_ts, + end_ts, + profile_type=SystemProfile, + ) + + assert {profile.operation.value for profile in system_profile.entities} == { + "DELETE", + "INSERT", + "UPDATE", + } + def assert_for_delete_table_is_marked_as_deleted( self, source_status: SourceStatus, sink_status: SinkStatus ): diff --git a/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py b/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py index 0042e497403..ad5fd8d6da0 100644 --- a/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py +++ b/ingestion/tests/cli_e2e/common_e2e_sqa_mixins.py @@ -28,3 +28,15 @@ class SQACommonMethods: connection.execute(self.drop_view_query) connection.execute(self.drop_table_query) connection.close() + + def run_update_queries(self) -> None: + with self.engine.connect() as connection: + for update_query in self.update_queries(): + connection.execute(update_query) + connection.close() + + def run_delete_queries(self) -> None: + with self.engine.connect() as connection: + for drop_query in self.delete_queries(): + connection.execute(drop_query) + connection.close() diff --git a/ingestion/tests/cli_e2e/test_cli_bigquery.py b/ingestion/tests/cli_e2e/test_cli_bigquery.py index 46eb9ca8ffd..4396b762f06 100644 --- a/ingestion/tests/cli_e2e/test_cli_bigquery.py +++ b/ingestion/tests/cli_e2e/test_cli_bigquery.py @@ -51,6 +51,12 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): def delete_table_and_view(self) -> None: SQACommonMethods.delete_table_and_view(self) + def delete_table_rows(self) -> None: + SQACommonMethods.run_delete_queries(self) + + def update_table_row(self) -> None: + SQACommonMethods.run_update_queries(self) + @staticmethod def get_connector_name() -> str: return "bigquery" @@ -104,3 +110,19 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_mix() -> int: return 1 + + @staticmethod + def delete_queries() -> List[str]: + return [ + """ + DELETE FROM `open-metadata-beta.exclude_me`.orders WHERE id IN (1) + """, + ] + + @staticmethod + def update_queries() -> List[str]: + return [ + """ + UPDATE `open-metadata-beta.exclude_me`.orders SET order_name = 'NINTENDO' WHERE id = 2 + """, + ] diff --git a/ingestion/tests/cli_e2e/test_cli_redshift.py b/ingestion/tests/cli_e2e/test_cli_redshift.py index 08368f65108..15f90e3bb4f 100644 --- a/ingestion/tests/cli_e2e/test_cli_redshift.py +++ b/ingestion/tests/cli_e2e/test_cli_redshift.py @@ -68,6 +68,12 @@ class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods): def delete_table_and_view(self) -> None: SQACommonMethods.delete_table_and_view(self) + def delete_table_rows(self) -> None: + SQACommonMethods.run_delete_queries(self) + + def update_table_row(self) -> None: + SQACommonMethods.run_update_queries(self) + @staticmethod def get_connector_name() -> str: return "redshift" @@ -188,3 +194,19 @@ class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods): } ], } + + @staticmethod + def delete_queries() -> List[str]: + return [ + """ + DELETE FROM e2e_cli_tests.dbt_jaffle.persons WHERE person_id IN (1,2) + """, + ] + + @staticmethod + def update_queries() -> List[str]: + return [ + """ + UPDATE e2e_cli_tests.dbt_jaffle.persons SET full_name = 'Bruce Wayne' WHERE person_id = 3 + """, + ] diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index 83f0e472bf5..de890fd9a38 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -21,9 +21,10 @@ from metadata.ingestion.api.source import SourceStatus from .base.e2e_types import E2EType from .common.test_cli_db import CliCommonDB +from .common_e2e_sqa_mixins import SQACommonMethods -class SnowflakeCliTest(CliCommonDB.TestSuite): +class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): """ Snowflake CLI Tests """ @@ -102,6 +103,12 @@ class SnowflakeCliTest(CliCommonDB.TestSuite): connection.execute(self.drop_table_query) connection.close() + def delete_table_rows(self) -> None: + SQACommonMethods.run_delete_queries(self) + + def update_table_row(self) -> None: + SQACommonMethods.run_update_queries(self) + @pytest.mark.order(2) def test_create_table_with_profiler(self) -> None: # delete table in case it exists @@ -123,6 +130,21 @@ class SnowflakeCliTest(CliCommonDB.TestSuite): sink_status, source_status = self.retrieve_statuses(result) self.assert_for_table_with_profiler(source_status, sink_status) + @pytest.mark.order(12) + def test_system_metrics(self) -> None: + self.delete_table_and_view() + self.create_table_and_view() + self.build_config_file() + self.run_command() + self.build_config_file( + E2EType.PROFILER, {"includes": self.get_includes_schemas()} + ) + self.delete_table_rows() + self.update_table_row() + result = self.run_command("profile") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_system_metrics(source_status, sink_status) + @staticmethod def expected_tables() -> int: return 7 @@ -168,3 +190,19 @@ class SnowflakeCliTest(CliCommonDB.TestSuite): @staticmethod def expected_filtered_mix() -> int: return 6 + + @staticmethod + def delete_queries() -> List[str]: + return [ + """ + DELETE FROM E2E_DB.E2E_TEST.PERSONS WHERE full_name = 'Peter Parker' + """, + ] + + @staticmethod + def update_queries() -> List[str]: + return [ + """ + UPDATE E2E_DB.E2E_TEST.PERSONS SET full_name = 'Bruce Wayne' WHERE full_name = 'Clark Kent' + """, + ] diff --git a/ingestion/tests/unit/profiler/conftest.py b/ingestion/tests/unit/profiler/conftest.py index f231791b5e6..05afb377fd2 100644 --- a/ingestion/tests/unit/profiler/conftest.py +++ b/ingestion/tests/unit/profiler/conftest.py @@ -55,3 +55,17 @@ def base_table(): ), ], ) + + +class Row: + def __init__( + self, + query_id, + query_type, + start_time, + query_text, + ): + self.query_id = query_id + self.query_type = query_type + self.start_time = start_time + self.query_text = query_text diff --git a/ingestion/tests/unit/profiler/test_utils.py b/ingestion/tests/unit/profiler/test_utils.py index 2c5434cc10c..3f575780f70 100644 --- a/ingestion/tests/unit/profiler/test_utils.py +++ b/ingestion/tests/unit/profiler/test_utils.py @@ -14,6 +14,7 @@ Tests utils function for the profiler """ import os +from datetime import datetime from unittest import TestCase from sqlalchemy import Column, create_engine @@ -21,9 +22,11 @@ from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.sql.sqltypes import Integer, String from metadata.profiler.metrics.hybrid.histogram import Histogram -from metadata.utils.profiler_utils import ColumnLike +from metadata.utils.profiler_utils import ColumnLike, get_snowflake_system_queries from metadata.utils.sqa_utils import handle_array, is_array +from .conftest import Row + Base = declarative_base() @@ -137,3 +140,32 @@ def test_column_like_object(): assert not kwargs assert column_like._is_array is False assert column_like._array_col is None + + +def test_get_snowflake_system_queries(): + """Test get snowflake system queries""" + row = Row( + query_id=1, + query_type="INSERT", + start_time=datetime.now(), + query_text="INSERT INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')", + ) + + query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore + assert query_result + assert query_result.query_id == 1 + assert query_result.query_type == "INSERT" + assert query_result.database_name == "database" + assert query_result.schema_name == "schema" + assert query_result.table_name == "table1" + + row = Row( + query_id=1, + query_type="INSERT", + start_time=datetime.now(), + query_text="INSERT INTO SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')", + ) + + query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore + + assert not query_result diff --git a/ingestion/tests/unit/test_helpers.py b/ingestion/tests/unit/test_helpers.py index 5b3c4c5d6dc..ddc8e6fd66b 100644 --- a/ingestion/tests/unit/test_helpers.py +++ b/ingestion/tests/unit/test_helpers.py @@ -23,6 +23,7 @@ from metadata.generated.schema.type.tagLabel import ( ) from metadata.utils.helpers import ( clean_up_starting_ending_double_quotes_in_string, + deep_size_of_dict, get_entity_tier_from_tags, list_to_dict, ) @@ -85,3 +86,13 @@ class TestHelpers(TestCase): ) assert get_entity_tier_from_tags(table_entity_wo_tier.tags) is None + + def test_deep_size_of_dict(self): + """test deep size of dict""" + test_dict = { + "a": 1, + "b": {"c": 2, "d": {"e": "Hello World", "f": [4, 5, 6]}}, + } + + assert deep_size_of_dict(test_dict) >= 1000 + assert deep_size_of_dict(test_dict) <= 1500 diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index c17c4fc75e5..050e988ae0a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -2750,6 +2750,21 @@ public interface CollectionDAO { @Bind("json") String json, @Bind("timestamp") Long timestamp); + @ConnectionAwareSqlUpdate( + value = + "UPDATE entity_extension_time_series set json = :json where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "UPDATE entity_extension_time_series set json = (:json :: jsonb) where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp and json #>>'{operation}' = :operation", + connectionType = POSTGRES) + void updateExtensionByOperation( + @Bind("entityFQN") String entityFQN, + @Bind("extension") String extension, + @Bind("json") String json, + @Bind("timestamp") Long timestamp, + @Bind("operation") String operation); + @SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension") String getExtension(@Bind("entityFQN") String entityId, @Bind("extension") String extension); @@ -2793,6 +2808,20 @@ public interface CollectionDAO { String getExtensionAtTimestamp( @Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("timestamp") long timestamp); + @ConnectionAwareSqlQuery( + value = + "SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp AND json -> '$.operation' = :operation", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp AND json #>>'{operation}' = :operation", + connectionType = POSTGRES) + String getExtensionAtTimestampWithOperation( + @Bind("entityFQN") String entityFQN, + @Bind("extension") String extension, + @Bind("timestamp") long timestamp, + @Bind("operation") String operation); + @SqlQuery( "SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension " + "ORDER BY timestamp DESC LIMIT 1") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 7baec4f8b9d..f12e6720a76 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -383,17 +383,21 @@ public class TableRepository extends EntityRepository { JsonUtils.readValue( daoCollection .entityExtensionTimeSeriesDao() - .getExtensionAtTimestamp( - table.getFullyQualifiedName(), SYSTEM_PROFILE_EXTENSION, systemProfile.getTimestamp()), + .getExtensionAtTimestampWithOperation( + table.getFullyQualifiedName(), + SYSTEM_PROFILE_EXTENSION, + systemProfile.getTimestamp(), + systemProfile.getOperation().value()), SystemProfile.class); if (storedSystemProfile != null) { daoCollection .entityExtensionTimeSeriesDao() - .update( + .updateExtensionByOperation( table.getFullyQualifiedName(), SYSTEM_PROFILE_EXTENSION, JsonUtils.pojoToJson(systemProfile), - storedSystemProfile.getTimestamp()); + storedSystemProfile.getTimestamp(), + storedSystemProfile.getOperation().value()); } else { daoCollection .entityExtensionTimeSeriesDao()