diff --git a/ingestion/src/metadata/profiler/metrics/core.py b/ingestion/src/metadata/profiler/metrics/core.py index a1cd73b08d2..9cc219777a5 100644 --- a/ingestion/src/metadata/profiler/metrics/core.py +++ b/ingestion/src/metadata/profiler/metrics/core.py @@ -228,7 +228,7 @@ class SystemMetric(Metric, ABC): """Abstract class for system metrics""" @abstractmethod - def sql(self): + def sql(self, session: Session, **kwargs): """SQL query to get system Metric""" diff --git a/ingestion/src/metadata/profiler/metrics/system/__init__.py b/ingestion/src/metadata/profiler/metrics/system/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/metrics/system/dml_operation.py b/ingestion/src/metadata/profiler/metrics/system/dml_operation.py new file mode 100644 index 00000000000..40807c9759c --- /dev/null +++ b/ingestion/src/metadata/profiler/metrics/system/dml_operation.py @@ -0,0 +1,35 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +DML Operation class and mapper +""" + +from enum import Enum + +from metadata.generated.schema.entity.data.table import DmlOperationType + + +class DatabaseDMLOperations(Enum): + """enum of supported DML operation on database engine side""" + + INSERT = "INSERT" + UPDATE = "UPDATE" + DELETE = "DELETE" + MERGE = "MERGE" + + +DML_OPERATION_MAP = { + DatabaseDMLOperations.INSERT.value: DmlOperationType.INSERT.value, + DatabaseDMLOperations.MERGE.value: DmlOperationType.UPDATE.value, + DatabaseDMLOperations.UPDATE.value: DmlOperationType.UPDATE.value, + DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value, +} diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/__init__.py b/ingestion/src/metadata/profiler/metrics/system/queries/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/bigquery.py b/ingestion/src/metadata/profiler/metrics/system/queries/bigquery.py new file mode 100644 index 00000000000..f54853604bc --- /dev/null +++ b/ingestion/src/metadata/profiler/metrics/system/queries/bigquery.py @@ -0,0 +1,54 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Bigquery System Metric Queries +""" +from datetime import datetime + +from pydantic import BaseModel + +from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations + + +class BigQueryQueryResult(BaseModel): + table_name: dict + timestamp: datetime + query_type: str + dml_statistics: dict + + +DML_STAT_TO_DML_STATEMENT_MAPPING = { + "inserted_row_count": DatabaseDMLOperations.INSERT.value, + "deleted_row_count": DatabaseDMLOperations.DELETE.value, + "updated_row_count": DatabaseDMLOperations.UPDATE.value, +} + +JOBS = """ + SELECT + statement_type, + start_time, + destination_table, + dml_statistics + FROM + `region-{usage_location}`.INFORMATION_SCHEMA.JOBS + WHERE + DATE(creation_time) >= CURRENT_DATE() - 1 AND + destination_table.dataset_id = '{dataset_id}' AND + destination_table.project_id = '{project_id}' AND + statement_type IN ( + '{insert}', + '{update}', + '{delete}', + '{merge}' + ) + ORDER BY creation_time DESC; +""" diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py b/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py new file mode 100644 index 00000000000..c6a3d61fa4d --- /dev/null +++ b/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py @@ -0,0 +1,105 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Redshift System Metric Queries and queries operations +""" + +from typing import List + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from metadata.utils.profiler_utils import QueryResult + +STL_QUERY = """ + with data as ( + select + {alias}.* + from + pg_catalog.stl_insert si + {join_type} join pg_catalog.stl_delete sd on si.query = sd.query + where + {condition} + ) + SELECT + SUM(data."rows") AS "rows", + sti."database", + sti."schema", + sti."table", + sq.text, + DATE_TRUNC('second', data.starttime) AS starttime + FROM + data + INNER JOIN pg_catalog.svv_table_info sti ON data.tbl = sti.table_id + INNER JOIN pg_catalog.stl_querytext sq ON data.query = sq.query + where + sti."database" = '{database}' AND + sti."schema" = '{schema}' AND + "rows" != 0 AND + DATE(data.starttime) >= CURRENT_DATE - 1 + GROUP BY 2,3,4,5,6 + ORDER BY 6 DESC +""" + + +def get_query_results( + session: Session, + query, + operation, +) -> List[QueryResult]: + """get query results either from cache or from the database + + Args: + cache (Optional[List[QueryResult]]): cache results + session (Session): session + query (_type_): query + operation (_type_): operation + + Returns: + List[QueryResult]: + """ + cursor = session.execute(text(query)) + results = [ + QueryResult( + database_name=row.database, + schema_name=row.schema, + table_name=row.table, + query_text=row.text, + query_type=operation, + timestamp=row.starttime, + rows=row.rows, + ) + for row in cursor + ] + + return results + + +def get_metric_result(ddls: List[QueryResult], table_name: str) -> List: + """Given query results, retur the metric result + + Args: + ddls (List[QueryResult]): list of query results + table_name (str): table name + + Returns: + List: + """ + return [ + { + "timestamp": int(ddl.timestamp.timestamp() * 1000), + "operation": ddl.query_type, + "rowsAffected": ddl.rows, + } + for ddl in ddls + if ddl.table_name == table_name + ] diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py b/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py new file mode 100644 index 00000000000..1c42b7b7d06 --- /dev/null +++ b/ingestion/src/metadata/profiler/metrics/system/queries/snowflake.py @@ -0,0 +1,99 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Snowflake System Metric Queries and query operations +""" + +import re +from typing import Optional + +from sqlalchemy.engine.row import Row + +from metadata.utils.logger import profiler_logger +from metadata.utils.profiler_utils import QueryResult, get_identifiers_from_string + +logger = profiler_logger() + +INFORMATION_SCHEMA_QUERY = """ + SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" + WHERE + start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP) + AND QUERY_TEXT ILIKE '%{tablename}%' + AND QUERY_TYPE IN ( + '{insert}', + '{update}', + '{delete}', + '{merge}' + ) + AND EXECUTION_STATUS = 'SUCCESS'; +""" + +RESULT_SCAN = """ + SELECT * + FROM TABLE(RESULT_SCAN('{query_id}')); + """ + + +def get_snowflake_system_queries( + row: Row, database: str, schema: str +) -> Optional[QueryResult]: + """get snowflake system queries for a specific database and schema. Parsing the query + is the only reliable way to get the DDL operation as fields in the table are not. If parsing + fails we'll fall back to regex lookup + + 1. Parse the query and check if we have an Identifier + 2. + + Args: + row (dict): row from the snowflake system queries table + database (str): database name + schema (str): schema name + Returns: + QueryResult: namedtuple with the query result + """ + + try: + logger.debug(f"Trying to parse query:\n{row.QUERY_TEXT}\n") + + pattern = r"(?:(INSERT\s*INTO\s*|INSERT\s*OVERWRITE\s*INTO\s*|UPDATE\s*|MERGE\s*INTO\s*|DELETE\s*FROM\s*))([\w._\"]+)(?=[\s*\n])" # pylint: disable=line-too-long + match = re.match(pattern, row.QUERY_TEXT, re.IGNORECASE) + try: + identifier = match.group(2) + except (IndexError, AttributeError): + logger.debug("Could not find identifier in query. Skipping row.") + return None + + database_name, schema_name, table_name = get_identifiers_from_string(identifier) + + if not all([database_name, schema_name, table_name]): + logger.debug( + "Missing database, schema, or table. Can't link operation to table entity in OpenMetadata." + ) + return None + + if ( + database.lower() == database_name.lower() + and schema.lower() == schema_name.lower() + ): + return QueryResult( + query_id=row.QUERY_ID, + database_name=database_name.lower(), + schema_name=schema_name.lower(), + table_name=table_name.lower(), + query_text=row.QUERY_TEXT, + query_type=row.QUERY_TYPE, + timestamp=row.START_TIME, + ) + except Exception: + return None + + return None diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index e0425a7ac2f..5f3b7608c31 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -8,31 +8,47 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +# pylint: disable=unused-argument """ System Metric """ import traceback -from collections import defaultdict, namedtuple -from enum import Enum +from collections import defaultdict from textwrap import dedent from typing import Dict, List, Optional -import sqlparse from sqlalchemy import text from sqlalchemy.orm import DeclarativeMeta, Session -from metadata.generated.schema.entity.data.table import DmlOperationType from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( BigQueryConnection, ) from metadata.profiler.metrics.core import SystemMetric +from metadata.profiler.metrics.system.dml_operation import ( + DML_OPERATION_MAP, + DatabaseDMLOperations, +) +from metadata.profiler.metrics.system.queries.bigquery import ( + DML_STAT_TO_DML_STATEMENT_MAPPING, + JOBS, + BigQueryQueryResult, +) +from metadata.profiler.metrics.system.queries.redshift import ( + STL_QUERY, + get_metric_result, + get_query_results, +) +from metadata.profiler.metrics.system.queries.snowflake import ( + INFORMATION_SCHEMA_QUERY, + RESULT_SCAN, + get_snowflake_system_queries, +) from metadata.profiler.orm.registry import Dialects from metadata.utils.dispatch import valuedispatch from metadata.utils.helpers import deep_size_of_dict from metadata.utils.logger import profiler_logger -from metadata.utils.profiler_utils import clean_up_query, get_snowflake_system_queries +from metadata.utils.profiler_utils import get_value_from_cache, set_cache logger = profiler_logger() @@ -44,22 +60,6 @@ def recursive_dic(): return defaultdict(recursive_dic) -class DatabaseDMLOperations(Enum): - """enum of supported DML operation on database engine side""" - - INSERT = "INSERT" - UPDATE = "UPDATE" - DELETE = "DELETE" - MERGE = "MERGE" - - -DML_OPERATION_MAP = { - DatabaseDMLOperations.INSERT.value: DmlOperationType.INSERT.value, - DatabaseDMLOperations.MERGE.value: DmlOperationType.UPDATE.value, - DatabaseDMLOperations.UPDATE.value: DmlOperationType.UPDATE.value, - DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value, -} - SYSTEM_QUERY_RESULT_CACHE = recursive_dic() @@ -108,100 +108,83 @@ def _( List[Dict]: """ logger.debug(f"Fetching system metrics for {dialect}") - dml_stat_to_dml_statement_mapping = { - "inserted_row_count": DatabaseDMLOperations.INSERT.value, - "deleted_row_count": DatabaseDMLOperations.DELETE.value, - "updated_row_count": DatabaseDMLOperations.UPDATE.value, - } - project_id = session.get_bind().url.host - dataset_id = table.__table_args__["schema"] - jobs = dedent( - f""" - SELECT - statement_type, - start_time, - destination_table, - dml_statistics - FROM - `region-{conn_config.usageLocation}`.INFORMATION_SCHEMA.JOBS - WHERE - DATE(creation_time) >= CURRENT_DATE() - 1 AND - destination_table.dataset_id = '{dataset_id}' AND - destination_table.project_id = '{project_id}' AND - statement_type IN ( - '{DatabaseDMLOperations.INSERT.value}', - '{DatabaseDMLOperations.DELETE.value}', - '{DatabaseDMLOperations.UPDATE.value}', - '{DatabaseDMLOperations.MERGE.value}' - ) - ORDER BY creation_time DESC; - """ - ) + project_id = session.get_bind().url.host + dataset_id = table.__table_args__["schema"] # type: ignore metric_results: List[Dict] = [] - QueryResult = namedtuple( - "QueryResult", - "query_type,timestamp,destination_table,dml_statistics", + # QueryResult = namedtuple( + # "QueryResult", + # "query_type,timestamp,destination_table,dml_statistics", + # ) + + jobs = get_value_from_cache( + SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs" ) - if ( - "query_results" - in SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id] - ): - # we'll try to get the cached data first - query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][ - dataset_id - ]["query_results"] - else: - cursor_jobs = session.execute(text(jobs)) - query_results = [ - QueryResult( - row.statement_type, - row.start_time, - row.destination_table, - row.dml_statistics, + if not jobs: + cursor_jobs = session.execute( + text( + JOBS.format( + usage_location=conn_config.usageLocation, + dataset_id=dataset_id, + project_id=project_id, + insert=DatabaseDMLOperations.INSERT.value, + update=DatabaseDMLOperations.UPDATE.value, + delete=DatabaseDMLOperations.DELETE.value, + merge=DatabaseDMLOperations.MERGE.value, + ) ) - for row in cursor_jobs.fetchall() + ) + jobs = [ + BigQueryQueryResult( + query_type=row.statement_type, + timestamp=row.start_time, + table_name=row.destination_table, + dml_statistics=row.dml_statistics, + ) + for row in cursor_jobs ] - SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id][ - "query_results" - ] = query_results + set_cache( + SYSTEM_QUERY_RESULT_CACHE, + f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs", + jobs, + ) - for row_jobs in query_results: - if row_jobs.destination_table.get("table_id") == table.__tablename__: + for job in jobs: + if job.table_name.get("table_id") == table.__tablename__: # type: ignore rows_affected = None try: - if row_jobs.query_type == DatabaseDMLOperations.INSERT.value: - rows_affected = row_jobs.dml_statistics.get("inserted_row_count") - if row_jobs.query_type == DatabaseDMLOperations.DELETE.value: - rows_affected = row_jobs.dml_statistics.get("deleted_row_count") - if row_jobs.query_type == DatabaseDMLOperations.UPDATE.value: - rows_affected = row_jobs.dml_statistics.get("updated_row_count") + if job.query_type == DatabaseDMLOperations.INSERT.value: + rows_affected = job.dml_statistics.get("inserted_row_count") + if job.query_type == DatabaseDMLOperations.DELETE.value: + rows_affected = job.dml_statistics.get("deleted_row_count") + if job.query_type == DatabaseDMLOperations.UPDATE.value: + rows_affected = job.dml_statistics.get("updated_row_count") except AttributeError: logger.debug(traceback.format_exc()) rows_affected = None - if row_jobs.query_type == DatabaseDMLOperations.MERGE.value: - for indx, key in enumerate(row_jobs.dml_statistics): - if row_jobs.dml_statistics[key] != 0: + if job.query_type == DatabaseDMLOperations.MERGE.value: + for indx, key in enumerate(job.dml_statistics): + if job.dml_statistics[key] != 0: metric_results.append( { # Merge statement can include multiple DML operations # We are padding timestamps by 0,1,2 millisesond to avoid # duplicate timestamps - "timestamp": int(row_jobs.timestamp.timestamp() * 1000) + "timestamp": int(job.timestamp.timestamp() * 1000) + indx, - "operation": dml_stat_to_dml_statement_mapping.get(key), - "rowsAffected": row_jobs.dml_statistics[key], + "operation": DML_STAT_TO_DML_STATEMENT_MAPPING.get(key), + "rowsAffected": job.dml_statistics[key], } ) continue metric_results.append( { - "timestamp": int(row_jobs.timestamp.timestamp() * 1000), - "operation": row_jobs.query_type, + "timestamp": int(job.timestamp.timestamp() * 1000), + "operation": job.query_type, "rowsAffected": rows_affected, } ) @@ -229,237 +212,147 @@ def _( """ logger.debug(f"Fetching system metrics for {dialect}") database = session.get_bind().url.database - schema = table.__table_args__["schema"] - - stl_deleted = dedent( - f""" - SELECT - SUM(si."rows") AS "rows", - sti."database", - sti."schema", - sti."table", - sq.text, - DATE_TRUNC('second', si.starttime) AS starttime - FROM - pg_catalog.stl_delete si - INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id - INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query - WHERE - sti."database" = '{database}' AND - sti."schema" = '{schema}' AND - "rows" != 0 AND - DATE(starttime) >= CURRENT_DATE - 1 - GROUP BY 2,3,4,5,6 - ORDER BY 6 desc - """ - ) - - stl_insert = dedent( - f""" - SELECT - SUM(si."rows") AS "rows", - sti."database", - sti."schema", - sti."table", - sq.text, - DATE_TRUNC('second', si.starttime) AS starttime - FROM - pg_catalog.stl_insert si - INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id - INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query - WHERE - sti."database" = '{database}' AND - sti."schema" = '{schema}' AND - "rows" != 0 AND - DATE(starttime) >= CURRENT_DATE - 1 - GROUP BY 2,3,4,5,6 - ORDER BY 6 desc - """ - ) + schema = table.__table_args__["schema"] # type: ignore metric_results: List[Dict] = [] - QueryResult = namedtuple( - "QueryResult", - "database_name,schema_name,table_name,query_text,timestamp,rowsAffected", + + # get inserts ddl queries + inserts = get_value_from_cache( + SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.inserts" ) + if not inserts: + insert_query = STL_QUERY.format( + alias="si", + join_type="LEFT", + condition="sd.query is null", + database=database, + schema=schema, + ) + inserts = get_query_results( + session, + insert_query, + DatabaseDMLOperations.INSERT.value, + ) + set_cache( + SYSTEM_QUERY_RESULT_CACHE, + f"{Dialects.Redshift}.{database}.{schema}.inserts", + inserts, + ) + metric_results.extend(get_metric_result(inserts, table.__tablename__)) # type: ignore - if ( - "query_results_inserted" - in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema] - ): - # we'll try to get the cached data first - query_results_inserted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][ - schema - ]["query_results_inserted"] - else: - cursor_insert = session.execute(text(stl_insert)) - query_results_inserted = [ - QueryResult( - row.database, - row.schema, - row.table, - sqlparse.parse(clean_up_query(row.text))[0], - row.starttime, - row.rows, - ) - for row in cursor_insert.fetchall() - ] - SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][ - "query_results_inserted" - ] = query_results_inserted + # get deletes ddl queries + deletes = get_value_from_cache( + SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.deletes" + ) + if not deletes: + delete_query = STL_QUERY.format( + alias="sd", + join_type="RIGHT", + condition="si.query is null", + database=database, + schema=schema, + ) + deletes = get_query_results( + session, + delete_query, + DatabaseDMLOperations.DELETE.value, + ) + set_cache( + SYSTEM_QUERY_RESULT_CACHE, + f"{Dialects.Redshift}.{database}.{schema}.deletes", + deletes, + ) + metric_results.extend(get_metric_result(deletes, table.__tablename__)) # type: ignore - if ( - "query_results_deleted" - in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema] - ): - # we'll try to get the cached data first - query_results_deleted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][ - schema - ]["query_results_deleted"] - else: - cursor_deleted = session.execute(text(stl_deleted)) - query_results_deleted = [ - QueryResult( - row.database, - row.schema, - row.table, - sqlparse.parse(clean_up_query(row.text))[0], - row.starttime, - row.rows, - ) - for row in cursor_deleted.fetchall() - ] - SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][ - "query_results_deleted" - ] = query_results_deleted - - for row_inserted in query_results_inserted: - if row_inserted.table_name == table.__tablename__: - query_text = row_inserted.query_text - operation = next( - ( - token.value.upper() - for token in query_text.tokens - if token.ttype is sqlparse.tokens.DML - and token.value.upper() - in DmlOperationType._member_names_ # pylint: disable=protected-access - ), - None, - ) - if operation: - metric_results.append( - { - "timestamp": int(row_inserted.timestamp.timestamp() * 1000), - "operation": operation, - "rowsAffected": row_inserted.rowsAffected, - } - ) - - for row_deleted in query_results_deleted: - if row_deleted.table_name == table.__tablename__: - query_text = row_deleted.query_text - operation = next( - ( - token.value.upper() - for token in query_text.tokens - if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE" - ), - None, - ) - - if operation: - metric_results.append( - { - "timestamp": int(row_deleted.timestamp.timestamp() * 1000), - "operation": operation, - "rowsAffected": row_deleted.rowsAffected, - } - ) + # get updates ddl queries + updates = get_value_from_cache( + SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.updates" + ) + if not updates: + update_query = STL_QUERY.format( + alias="si", + join_type="INNER", + condition="sd.query is not null", + database=database, + schema=schema, + ) + updates = get_query_results( + session, + update_query, + DatabaseDMLOperations.UPDATE.value, + ) + set_cache( + SYSTEM_QUERY_RESULT_CACHE, + f"{Dialects.Redshift}.{database}.{schema}.updates", + updates, + ) + metric_results.extend(get_metric_result(updates, table.__tablename__)) # type: ignore return metric_results -# @get_system_metrics_for_dialect.register(Dialects.Snowflake) -# def _( -# dialect: str, -# session: Session, -# table: DeclarativeMeta, -# *args, -# **kwargs, -# ) -> Optional[List[Dict]]: -# """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request. -# We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types -# (INSERTS, MERGE, DELETE, UPDATE). +@get_system_metrics_for_dialect.register(Dialects.Snowflake) +def _( + dialect: str, + session: Session, + table: DeclarativeMeta, + *args, + **kwargs, +) -> Optional[List[Dict]]: + """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request. + We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types + (INSERTS, MERGE, DELETE, UPDATE). -# To get the number of rows affected we'll use the specific query ID. + :waring: Unlike redshift and bigquery results are not cached as we'll be looking + at DDL for each table -# Args: -# dialect (str): dialect -# session (Session): session object + To get the number of rows affected we'll use the specific query ID. -# Returns: -# Dict: system metric -# """ -# logger.debug(f"Fetching system metrics for {dialect}") -# database = session.get_bind().url.database -# schema = table.__table_args__["schema"] + Args: + dialect (str): dialect + session (Session): session object -# metric_results: List[Dict] = [] + Returns: + Dict: system metric + """ + logger.debug(f"Fetching system metrics for {dialect}") + database = session.get_bind().url.database + schema = table.__table_args__["schema"] # type: ignore -# information_schema_query_history = f""" -# SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY" -# WHERE -# start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP) -# AND QUERY_TYPE IN ( -# '{DatabaseDMLOperations.INSERT.value}', -# '{DatabaseDMLOperations.UPDATE.value}', -# '{DatabaseDMLOperations.DELETE.value}', -# '{DatabaseDMLOperations.MERGE.value}' -# ) -# AND EXECUTION_STATUS = 'SUCCESS'; -# """ -# result_scan = """ -# SELECT * -# FROM TABLE(RESULT_SCAN('{query_id}')); -# """ + metric_results: List[Dict] = [] -# if ( -# "query_results" -# in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema] -# ): -# # we'll try to get the cached data first -# query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ -# "query_results" -# ] -# else: -# rows = session.execute(text(information_schema_query_history)) -# query_results = [] -# for row in rows: -# result = get_snowflake_system_queries(row, database, schema) -# if result: -# query_results.append(result) -# SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][ -# "query_results" -# ] = query_results + rows = session.execute( + text( + INFORMATION_SCHEMA_QUERY.format( + tablename=table.__tablename__, # type: ignore + insert=DatabaseDMLOperations.INSERT.value, + update=DatabaseDMLOperations.UPDATE.value, + delete=DatabaseDMLOperations.DELETE.value, + merge=DatabaseDMLOperations.MERGE.value, + ) + ) + ) + query_results = [] + for row in rows: + result = get_snowflake_system_queries(row, database, schema) + if result: + query_results.append(result) -# for query_result in query_results: -# if table.__tablename__.lower() == query_result.table_name: -# cursor_for_result_scan = session.execute( -# text(dedent(result_scan.format(query_id=query_result.query_id))) -# ) -# row_for_result_scan = cursor_for_result_scan.first() + for query_result in query_results: + cursor_for_result_scan = session.execute( + text(dedent(RESULT_SCAN.format(query_id=query_result.query_id))) + ) + row_for_result_scan = cursor_for_result_scan.first() -# metric_results.append( -# { -# "timestamp": int(query_result.timestamp.timestamp() * 1000), -# "operation": DML_OPERATION_MAP.get(query_result.query_type), -# "rowsAffected": row_for_result_scan[0] -# if row_for_result_scan -# else None, -# } -# ) + metric_results.append( + { + "timestamp": int(query_result.timestamp.timestamp() * 1000), + "operation": DML_OPERATION_MAP.get(query_result.query_type), + "rowsAffected": row_for_result_scan[0] if row_for_result_scan else None, + } + ) -# return metric_results + return metric_results class System(SystemMetric): @@ -513,7 +406,7 @@ class System(SystemMetric): system_metrics = get_system_metrics_for_dialect( session.get_bind().dialect.name, session=session, - table=self.table, + table=self.table, # pylint: disable=no-member conn_config=conn_config, ) self._manage_cache() diff --git a/ingestion/src/metadata/utils/profiler_utils.py b/ingestion/src/metadata/utils/profiler_utils.py index e759fd3d458..efb5c2e5316 100644 --- a/ingestion/src/metadata/utils/profiler_utils.py +++ b/ingestion/src/metadata/utils/profiler_utils.py @@ -12,15 +12,34 @@ """Profiler utils class and functions""" import re -from collections import namedtuple -from typing import Optional +from collections import defaultdict +from datetime import datetime +from functools import reduce +from typing import Optional, Tuple import sqlparse -from sqlalchemy.engine.row import Row -from sqlparse.sql import Identifier +from pydantic import BaseModel +from metadata.utils.logger import profiler_logger from metadata.utils.sqa_utils import is_array +logger = profiler_logger() + +PARSING_TIMEOUT = 10 + + +class QueryResult(BaseModel): + """System metric query result shared by Redshift and Snowflake""" + + database_name: str + schema_name: str + table_name: str + query_type: str + timestamp: datetime + query_id: Optional[str] = None + query_text: Optional[str] = None + rows: Optional[int] = None + class ColumnLike: """We don't have column information at this stage (only metric entities) @@ -57,60 +76,58 @@ def clean_up_query(query: str) -> str: return sqlparse.format(query, strip_comments=True).replace("\\n", "") -def get_snowflake_system_queries( - row: Row, database: str, schema: str -) -> Optional["QueryResult"]: - """get snowflake system queries for a specific database and schema. Parsing the query - is the only reliable way to get the DDL operation as fields in the table are not. +def get_identifiers_from_string( + identifier: str, +) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """given a string identifier try to fetch the database, schema and table names. + part of the identifier name as `"DATABASE.DOT"` will be returned on the left side of the tuple + and the rest of the identifier name as `"SCHEMA.DOT.TABLE"` will be returned on the right side of the tuple Args: - row (dict): row from the snowflake system queries table - database (str): database name - schema (str): schema name + identifier (str): table identifier + Returns: - QueryResult: namedtuple with the query result + Tuple[str, str, str]: database, schema and table names """ + pattern = r"\"([^\"]+)\"|(\w+(?:\.\w+)*(?:\.\w+)*)" + matches = re.findall(pattern, identifier) - QueryResult = namedtuple( - "QueryResult", - "query_id,database_name,schema_name,table_name,query_text,query_type,timestamp", - ) + values = [] + for match in matches: + if match[0] != "": + values.append(match[0]) + if match[1] != "": + split_match = match[1].split(".") + values.extend(split_match) + database_name, schema_name, table_name = ([None] * (3 - len(values))) + values + return database_name, schema_name, table_name + + +def get_value_from_cache(cache: dict, key: str): + """given a dict of cache and a key, return the value if exists + + Args: + cache (dict): dict of cache + key (str): key to look for in the cache + """ try: - parsed_query = sqlparse.parse(clean_up_query(row.query_text))[0] - identifier = next( - ( - query_el - for query_el in parsed_query.tokens - if isinstance(query_el, Identifier) - ), - None, - ) - if not identifier: - return None - values = identifier.value.split(".") - database_name, schema_name, table_name = ([None] * (3 - len(values))) + values - - if not all([database_name, schema_name, table_name]): - return None - - # clean up table name - table_name = re.sub(r"\s.*", "", table_name).strip() - - if ( - database.lower() == database_name.lower() - and schema.lower() == schema_name.lower() - ): - return QueryResult( - row.query_id, - database_name.lower(), - schema_name.lower(), - table_name.lower(), - parsed_query, - row.query_type, - row.start_time, - ) - except Exception: + return reduce(dict.get, key.split("."), cache) + except TypeError: return None - return None + +def set_cache(cache: defaultdict, key: str, value): + """given a dict of cache, a key and a value, set the value in the cache + + Args: + cache (dict): dict of cache + key (str): key to set for in the cache + value: value to set in the cache + """ + split_key = key.split(".") + for indx, key_ in enumerate(split_key): + if indx == len(split_key) - 1: + cache[key_] = value + break + cache = cache[key_] diff --git a/ingestion/tests/integration/orm_profiler/system/__init__.py b/ingestion/tests/integration/orm_profiler/system/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py new file mode 100644 index 00000000000..599f9a43d98 --- /dev/null +++ b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py @@ -0,0 +1,179 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Validate bigquery system metrics (will be disabled by default). To be ran manually + +How to use this test +-------------------- + +1. Comment the @pytest.mark.skip decorator on line 31 +2. Make sure you have set up the right environment variables for the bigquery database + check the config file at "cli_e2e/database/bigquery/bigquery.yaml". The metadata + ingestion will ingest data from the `dbt_jaffle` schema. +3. Prior to running this test you will need to execute DDLs in the `dbt_jaffle` schema. + We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema. + query example: + ``` + INSERT INTO dbt_jaffle.Person VALUES + ('John', 'Doe', 'II'), + ('Jane', 'Doe', 'II'), + ('Jeff', 'Doe', 'II') + + UPDATE dbt_jaffle.Person SET add = 'IV' WHERE first_name = 'John'; + + MERGE INTO dbt_jaffle.Person NT USING (SELECT 'Jeff' AS first_name, 'Doe' AS last_name, NULL AS add) N ON NT.first_name = N.first_name + WHEN MATCHED THEN UPDATE SET NT.first_name = N.first_name; + + DELETE FROM dbt_jaffle.Person WHERE first_name = 'John'; + ``` +4. Once you have performed the above steps, run the test with the following command: + `python -m pytest tests/integration/orm_profiler/system/test_bigquery_system_metrics.py` from the ingestion directory. + You can also perform the same action with your IDE. + + :warning: the profiler workflow will be ran for the table set in `PROFILER_TABLE_FILTER_PATTERN` +""" + +import os +import pathlib +from copy import deepcopy +from unittest import TestCase + +import pytest +import yaml + +from metadata.generated.schema.entity.data.table import SystemProfile +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.api.workflow import ProfilerWorkflow +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) + +TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent +BIGQUERY_CONFIG_FILE = "cli_e2e/database/bigquery/bigquery.yaml" +FULL_CONFIG_PATH = pathlib.Path(TESTS_ROOT_DIR, BIGQUERY_CONFIG_FILE) +DATABASE_FILTER = { + "includes": os.environ.get("E2E_BQ_PROJECT_ID"), + "excludes": None, +} +SCHEMA_FILTER = { + "includes": "dbt_jaffle", + "excludes": None, +} +TABLE_FILTER = { + "includes": "Person", + "excludes": None, +} + + +@pytest.mark.skip( + reason="Disabled by default. Should be ran manually on system metric updates" +) +class TestBigquerySystem(TestCase): + """Test class for bigquery system metrics""" + + taxonomy = os.environ.get("E2E_BQ_PROJECT_ID_TAXONOMY") + private_key_id = os.environ.get("E2E_BQ_PRIVATE_KEY_ID") + private_key = os.environ.get("E2E_BQ_PRIVATE_KEY") + project_id = DATABASE_FILTER["includes"] + client_email = os.environ.get("E2E_BQ_CLIENT_EMAIL") + client_id = os.environ.get("E2E_BQ_CLIENT_ID") + + full_config_path = FULL_CONFIG_PATH + + schema = SCHEMA_FILTER["includes"] + table = TABLE_FILTER["includes"] + + @classmethod + def setUpClass(cls) -> None: + """set up class""" + with open(cls.full_config_path, "r", encoding="utf-8") as file: + cls.config = yaml.safe_load(file) + + # set up the config to filter from the `dbt_jaffle` schema + cls.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "includes": [cls.schema], + } + cls.config["source"]["sourceConfig"]["config"]["tableFilterPattern"] = { + "includes": [cls.table], + } + cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][ + "projectId" + ] = cls.project_id + cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][ + "privateKeyId" + ] = cls.private_key_id + cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][ + "privateKey" + ] = cls.private_key + cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][ + "clientEmail" + ] = cls.client_email + cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][ + "clientId" + ] = cls.client_id + cls.config["source"]["serviceConnection"]["config"]["taxonomyProjectID"] = [ + cls.taxonomy + ] + + # set metadata config + cls.metadata_config_dict = cls.config["workflowConfig"][ + "openMetadataServerConfig" + ] + cls.metadata_config = OpenMetadataConnection.parse_obj(cls.metadata_config_dict) + cls.metadata = OpenMetadata(cls.metadata_config) + + # run the ingestion workflow + ingestion_workflow = Workflow.create(cls.config) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + ingestion_workflow.print_status() + ingestion_workflow.stop() + + # get table fqn + cls.table_fqn = f"{cls.config['source']['serviceName']}.{cls.project_id}.{cls.schema}.{cls.table}" + + def test_bigquery_system_metrics(self): + """run profiler workflow and check the system metrics""" + config = deepcopy(self.config) + # update the config to run the profiler workflow + config["source"]["sourceConfig"]["config"] = { + "type": "Profiler", + "generateSampleData": True, + "timeoutSeconds": 5400, + "tableFilterPattern": { + "includes": [self.table], + }, + } + config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + profiler_workflow = ProfilerWorkflow.create(config) + profiler_workflow.execute() + profiler_workflow.raise_from_status() + profiler_workflow.print_status() + profiler_workflow.stop() + + # get latest profile metrics + profile = self.metadata.get_profile_data( + self.table_fqn, + get_beginning_of_day_timestamp_mill(days=1), + get_end_of_day_timestamp_mill(), + profile_type=SystemProfile, + ) + ddl_operations = [prl.operation.value for prl in profile.entities] + assert set(ddl_operations) == set(["INSERT", "UPDATE", "DELETE"]) diff --git a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py new file mode 100644 index 00000000000..f56742b6142 --- /dev/null +++ b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py @@ -0,0 +1,146 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Validate redshift system metrics (will be disabled by default). To be ran manually + +How to use this test +-------------------- + +1. Comment the @pytest.mark.skip decorator on line 31 +2. Make sure you have set up the right environment variables for the redshift database + check the config file at "cli_e2e/database/redshift/redshift.yaml". The metadata + ingestion will ingest data from the `dbt_jaffle` schema. +3. Prior to running this test you will need to execute DDLs in the `dbt_jaffle` schema. + We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema. +4. Once you have performed the above steps, run the test with the following command: + `python -m pytest tests/integration/orm_profiler/system/test_redshift_system_metrics.py` from the ingestion directory. + You can also perform the same action with your IDE. + + :warning: the profiler workflow will be ran for the table set in `PROFILER_TABLE_FILTER_PATTERN` +""" + +import os +import pathlib +from copy import deepcopy +from unittest import TestCase + +import pytest +import yaml + +from metadata.generated.schema.entity.data.table import SystemProfile +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.api.workflow import ProfilerWorkflow +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) + +TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent +REDSHIFT_CONFIG_FILE = "cli_e2e/database/redshift/redshift.yaml" +FULL_CONFIG_PATH = pathlib.Path(TESTS_ROOT_DIR, REDSHIFT_CONFIG_FILE) +DATABASE_FILTER = { + "includes": "dev", + "excludes": None, +} +SCHEMA_FILTER = { + "includes": "dbt_jaffle", + "excludes": None, +} +TABLE_FILTER = { + "includes": "boolean_test", + "excludes": None, +} + + +@pytest.mark.skip( + reason="Disabled by default. Should be ran manually on system metric updates" +) +class TestRedshiftSystem(TestCase): + """Test class for redshift system metrics""" + + hostPort = os.environ.get("E2E_REDSHIFT_HOST_PORT") + username = os.environ.get("E2E_REDSHIFT_USERNAME") + password = os.environ.get("E2E_REDSHIFT_PASSWORD") + database = DATABASE_FILTER["includes"] + + full_config_path = FULL_CONFIG_PATH + + schema = SCHEMA_FILTER["includes"] + table = TABLE_FILTER["includes"] + + @classmethod + def setUpClass(cls) -> None: + """set up class""" + with open(cls.full_config_path, "r", encoding="utf-8") as file: + cls.config = yaml.safe_load(file) + + # set up the config to filter from the `dbt_jaffle` schema + cls.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "includes": [cls.schema], + } + cls.config["source"]["serviceConnection"]["config"]["hostPort"] = cls.hostPort + cls.config["source"]["serviceConnection"]["config"]["username"] = cls.username + cls.config["source"]["serviceConnection"]["config"]["password"] = cls.password + cls.config["source"]["serviceConnection"]["config"]["database"] = cls.database + + # set metadata config + cls.metadata_config_dict = cls.config["workflowConfig"][ + "openMetadataServerConfig" + ] + cls.metadata_config = OpenMetadataConnection.parse_obj(cls.metadata_config_dict) + cls.metadata = OpenMetadata(cls.metadata_config) + + # run the ingestion workflow + ingestion_workflow = Workflow.create(cls.config) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + ingestion_workflow.print_status() + ingestion_workflow.stop() + + # get table fqn + cls.table_fqn = f"{cls.config['source']['serviceName']}.{cls.database}.{cls.schema}.{cls.table}" + + def test_redshift_system_metrics(self): + """run profiler workflow and check the system metrics""" + config = deepcopy(self.config) + # update the config to run the profiler workflow + config["source"]["sourceConfig"]["config"] = { + "type": "Profiler", + "generateSampleData": True, + "timeoutSeconds": 5400, + "tableFilterPattern": { + "includes": [self.table], + }, + } + config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + profiler_workflow = ProfilerWorkflow.create(config) + profiler_workflow.execute() + profiler_workflow.raise_from_status() + profiler_workflow.print_status() + profiler_workflow.stop() + + # get latest profile metrics + profile = self.metadata.get_profile_data( + self.table_fqn, + get_beginning_of_day_timestamp_mill(days=1), + get_end_of_day_timestamp_mill(), + profile_type=SystemProfile, + ) + ddl_operations = [prl.operation.value for prl in profile.entities] + assert set(ddl_operations) == set(["INSERT", "UPDATE", "DELETE"]) diff --git a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py new file mode 100644 index 00000000000..f30a8199cff --- /dev/null +++ b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py @@ -0,0 +1,170 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Validate snowflake system metrics (will be disabled by default). To be ran manually + +How to use this test +-------------------- + +1. Comment the @pytest.mark.skip decorator on line 31 +2. Make sure you have set up the right environment variables for the snowflake database + check the config file at "cli_e2e/database/snowflake/snowflake.yaml". The metadata + ingestion will ingest data from the `TEST_DB` schema. +3. Prior to running this test you will need to execute DDLs in the `TEST_DB` schema. + We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema. + query example: + ``` + INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES + (1, 'FOO'), + (2, 'BAR'), + (3, 'BAZZ') + + INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES + (4, 'FOOBAR'), + (5, 'FOOBAZZ'), + (6, 'BARBAZZ') + + UPDATE TEST_DB.TEST_SCHEMA.NEW_TAB SET NAME='BABAR' WHERE id = 6; + + MERGE INTO TEST_DB.TEST_SCHEMA.NEW_TAB NT USING (SELECT 5 AS id, 'BAR' AS NAME) N ON NT.id = N.id + WHEN MATCHED THEN UPDATE SET NT.NAME = N.NAME; + + DELETE FROM TEST_DB.TEST_SCHEMA.NEW_TAB WHERE ID = 4; + ``` +4. Once you have performed the above steps, run the test with the following command: + `python -m pytest tests/integration/orm_profiler/system/test_snowflake_system_metrics.py` from the ingestion directory. + You can also perform the same action with your IDE. + + :warning: the profiler workflow will be ran for the table set in `PROFILER_TABLE_FILTER_PATTERN` +""" + +import os +import pathlib +from copy import deepcopy +from unittest import TestCase + +import pytest +import yaml + +from metadata.generated.schema.entity.data.table import SystemProfile +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.api.workflow import ProfilerWorkflow +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) + +TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent +SNOWFLAKE_CONFIG_FILE = "cli_e2e/database/snowflake/snowflake.yaml" +FULL_CONFIG_PATH = pathlib.Path(TESTS_ROOT_DIR, SNOWFLAKE_CONFIG_FILE) +DATABASE_FILTER = { + "includes": "TEST_DB", + "excludes": None, +} +SCHEMA_FILTER = { + "includes": "TEST_SCHEMA", + "excludes": None, +} +TABLE_FILTER = { + "includes": "NEW_TAB", + "excludes": None, +} + + +@pytest.mark.skip( + reason="Disabled by default. Should be ran manually on system metric updates" +) +class TestSnowflakeystem(TestCase): + """Test class for snowflake system metrics""" + + account = os.environ.get("E2E_SNOWFLAKE_ACCOUNT") + warehouse = os.environ.get("E2E_SNOWFLAKE_WAREHOUSE") + username = os.environ.get("E2E_SNOWFLAKE_USERNAME") + password = os.environ.get("E2E_SNOWFLAKE_PASSWORD") + database = DATABASE_FILTER["includes"] + + full_config_path = FULL_CONFIG_PATH + + schema = SCHEMA_FILTER["includes"] + table = TABLE_FILTER["includes"] + + @classmethod + def setUpClass(cls) -> None: + """set up class""" + with open(cls.full_config_path, "r", encoding="utf-8") as file: + cls.config = yaml.safe_load(file) + + # set up the config to filter from the `dbt_jaffle` schema + cls.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "includes": [cls.schema], + } + cls.config["source"]["sourceConfig"]["config"]["tableFilterPattern"] = { + "includes": [cls.table], + } + cls.config["source"]["serviceConnection"]["config"]["account"] = cls.account + cls.config["source"]["serviceConnection"]["config"]["warehouse"] = cls.warehouse + cls.config["source"]["serviceConnection"]["config"]["username"] = cls.username + cls.config["source"]["serviceConnection"]["config"]["password"] = cls.password + cls.config["source"]["serviceConnection"]["config"]["database"] = cls.database + + # set metadata config + cls.metadata_config_dict = cls.config["workflowConfig"][ + "openMetadataServerConfig" + ] + cls.metadata_config = OpenMetadataConnection.parse_obj(cls.metadata_config_dict) + cls.metadata = OpenMetadata(cls.metadata_config) + + # run the ingestion workflow + ingestion_workflow = Workflow.create(cls.config) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + ingestion_workflow.print_status() + ingestion_workflow.stop() + + # get table fqn + cls.table_fqn = f"{cls.config['source']['serviceName']}.{cls.database}.{cls.schema}.{cls.table}" + + def test_snowflake_system_metrics(self): + """run profiler workflow and check the system metrics""" + config = deepcopy(self.config) + # update the config to run the profiler workflow + config["source"]["sourceConfig"]["config"] = { + "type": "Profiler", + "generateSampleData": True, + "timeoutSeconds": 5400, + "tableFilterPattern": { + "includes": [self.table], + }, + } + config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + profiler_workflow = ProfilerWorkflow.create(config) + profiler_workflow.execute() + profiler_workflow.raise_from_status() + profiler_workflow.print_status() + profiler_workflow.stop() + + # get latest profile metrics + profile = self.metadata.get_profile_data( + self.table_fqn, + get_beginning_of_day_timestamp_mill(days=1), + get_end_of_day_timestamp_mill(), + profile_type=SystemProfile, + ) + ddl_operations = [prl.operation.value for prl in profile.entities] + assert set(ddl_operations) == set(["INSERT", "UPDATE", "DELETE"]) diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index ea039a55f5d..9eb77b3a56e 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -21,7 +21,6 @@ from copy import deepcopy from datetime import datetime, timedelta from unittest import TestCase -import pytest from sqlalchemy import Column, DateTime, Integer, String, create_engine from sqlalchemy.orm import declarative_base diff --git a/ingestion/tests/unit/profiler/conftest.py b/ingestion/tests/unit/profiler/conftest.py index 05afb377fd2..6ea0e4ccd70 100644 --- a/ingestion/tests/unit/profiler/conftest.py +++ b/ingestion/tests/unit/profiler/conftest.py @@ -65,7 +65,7 @@ class Row: start_time, query_text, ): - self.query_id = query_id - self.query_type = query_type - self.start_time = start_time - self.query_text = query_text + self.QUERY_ID = query_id + self.QUERY_TYPE = query_type + self.START_TIME = start_time + self.QUERY_TEXT = query_text diff --git a/ingestion/tests/unit/profiler/test_utils.py b/ingestion/tests/unit/profiler/test_utils.py index 3f575780f70..0ca19b55c2d 100644 --- a/ingestion/tests/unit/profiler/test_utils.py +++ b/ingestion/tests/unit/profiler/test_utils.py @@ -17,12 +17,22 @@ import os from datetime import datetime from unittest import TestCase +import pytest from sqlalchemy import Column, create_engine from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.sql.sqltypes import Integer, String from metadata.profiler.metrics.hybrid.histogram import Histogram -from metadata.utils.profiler_utils import ColumnLike, get_snowflake_system_queries +from metadata.profiler.metrics.system.queries.snowflake import ( + get_snowflake_system_queries, +) +from metadata.profiler.metrics.system.system import recursive_dic +from metadata.utils.profiler_utils import ( + ColumnLike, + get_identifiers_from_string, + get_value_from_cache, + set_cache, +) from metadata.utils.sqa_utils import handle_array, is_array from .conftest import Row @@ -145,7 +155,7 @@ def test_column_like_object(): def test_get_snowflake_system_queries(): """Test get snowflake system queries""" row = Row( - query_id=1, + query_id="1", query_type="INSERT", start_time=datetime.now(), query_text="INSERT INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')", @@ -153,7 +163,7 @@ def test_get_snowflake_system_queries(): query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore assert query_result - assert query_result.query_id == 1 + assert query_result.query_id == "1" assert query_result.query_type == "INSERT" assert query_result.database_name == "database" assert query_result.schema_name == "schema" @@ -169,3 +179,81 @@ def test_get_snowflake_system_queries(): query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore assert not query_result + + +@pytest.mark.parametrize( + "query, expected", + [ + ( + "INSERT INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')", + "INSERT", + ), + ( + "INSERT OVERWRITE INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')", + "INSERT", + ), + ( + "MERGE INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')", + "MERGE", + ), + ("DELETE FROM DATABASE.SCHEMA.TABLE1 WHERE val = 9999", "MERGE"), + ("UPDATE DATABASE.SCHEMA.TABLE1 SET col1 = 1 WHERE val = 9999", "UPDATE"), + ], +) +def test_get_snowflake_system_queries_all_dll(query, expected): + """test we ca get all ddl queries + reference https://docs.snowflake.com/en/sql-reference/sql-dml + """ + row = Row( + query_id=1, + query_type=expected, + start_time=datetime.now(), + query_text=query, + ) + + query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore + + assert query_result + assert query_result.query_type == expected + assert query_result.database_name == "database" + assert query_result.schema_name == "schema" + assert query_result.table_name == "table1" + + +@pytest.mark.parametrize( + "identifier, expected", + [ + ("DATABASE.SCHEMA.TABLE1", ("DATABASE", "SCHEMA", "TABLE1")), + ('DATABASE.SCHEMA."TABLE.DOT"', ("DATABASE", "SCHEMA", "TABLE.DOT")), + ('DATABASE."SCHEMA.DOT".TABLE', ("DATABASE", "SCHEMA.DOT", "TABLE")), + ('"DATABASE.DOT".SCHEMA.TABLE', ("DATABASE.DOT", "SCHEMA", "TABLE")), + ('DATABASE."SCHEMA.DOT"."TABLE.DOT"', ("DATABASE", "SCHEMA.DOT", "TABLE.DOT")), + ('"DATABASE.DOT"."SCHEMA.DOT".TABLE', ("DATABASE.DOT", "SCHEMA.DOT", "TABLE")), + ( + '"DATABASE.DOT"."SCHEMA.DOT"."TABLE.DOT"', + ("DATABASE.DOT", "SCHEMA.DOT", "TABLE.DOT"), + ), + ], +) +def test_get_identifiers_from_string(identifier, expected): + """test get identifiers from string""" + assert get_identifiers_from_string(identifier) == expected + + +def test_cache_func(): + """test get and set cache""" + cache_dict = recursive_dic() + cache_value = [1, 2, 3, 4, 5] + new_cache_value = [6, 7, 8, 9, 10] + + cache = get_value_from_cache(cache_dict, "key1.key2.key3") + assert not cache + + set_cache(cache_dict, "key1.key2.key3", cache_value) + cache = get_value_from_cache(cache_dict, "key1.key2.key3") + assert cache == cache_value + + # calling set_cache on the same key will reset the cache + set_cache(cache_dict, "key1.key2.key3", new_cache_value) + cache = get_value_from_cache(cache_dict, "key1.key2.key3") + assert cache == new_cache_value