Fixes #11384 - Implement mem. optimization for sys. metrics (#11460)

* fix: optimize system metrics retrieval for memory

* fix: ran python linting

* fix: logic to retrieve unique system metrics operations

* fix: added logic to clean up query before parsing it

* fix: added E2E tests for rds, bq, snflk system metrics

* fix: ran python linting

* fix: fix postgres query + add default byte size to env var

* fix: ran python linting
This commit is contained in:
Teddy 2023-05-09 12:05:35 +02:00 committed by GitHub
parent 6f85019daf
commit 60de33d7cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 520 additions and 149 deletions

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.api.data.createTableProfile import (
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
DataModel,
SystemProfile,
Table,
TableData,
TableJoins,
@ -252,13 +253,19 @@ class OMetaTableMixin:
url_after = f"&after={after}" if after else ""
profile_type_url = profile_type.__name__[0].lower() + profile_type.__name__[1:]
# system profile uses milliseconds
if profile_type is not SystemProfile:
start_ts = start_ts // 1000
end_ts = end_ts // 1000
resp = self.client.get(
f"{self.get_suffix(Table)}/{fqn}/{profile_type_url}?limit={limit}{url_after}",
data={"startTs": start_ts // 1000, "endTs": end_ts // 1000},
data={"startTs": start_ts, "endTs": end_ts},
)
if profile_type is TableProfile:
data: List[T] = [TableProfile(**datum) for datum in resp["data"]] # type: ignore
if profile_type in (TableProfile, SystemProfile):
data: List[T] = [profile_type(**datum) for datum in resp["data"]] # type: ignore
elif profile_type is ColumnProfile:
split_fqn = fqn.split(".")
if len(split_fqn) < 5:

View File

@ -14,7 +14,7 @@ System Metric
"""
import traceback
from collections import namedtuple
from collections import defaultdict, namedtuple
from enum import Enum
from textwrap import dedent
from typing import Dict, List, Optional
@ -22,7 +22,6 @@ from typing import Dict, List, Optional
import sqlparse
from sqlalchemy import text
from sqlalchemy.orm import DeclarativeMeta, Session
from sqlparse.sql import Identifier
from metadata.generated.schema.entity.data.table import DmlOperationType
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
@ -31,10 +30,19 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn
from metadata.profiler.metrics.core import SystemMetric
from metadata.profiler.orm.registry import Dialects
from metadata.utils.dispatch import valuedispatch
from metadata.utils.helpers import deep_size_of_dict
from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import clean_up_query, get_snowflake_system_queries
logger = profiler_logger()
MAX_SIZE_IN_BYTES = 2 * 1024**3 # 2GB
def recursive_dic():
"""recursive default dict"""
return defaultdict(recursive_dic)
class DatabaseDMLOperations(Enum):
"""enum of supported DML operation on database engine side"""
@ -52,7 +60,7 @@ DML_OPERATION_MAP = {
DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value,
}
SYSTEM_QUERY_RESULT_CACHE = {}
SYSTEM_QUERY_RESULT_CACHE = recursive_dic()
@valuedispatch
@ -105,6 +113,8 @@ def _(
"deleted_row_count": DatabaseDMLOperations.DELETE.value,
"updated_row_count": DatabaseDMLOperations.UPDATE.value,
}
project_id = session.get_bind().url.host
dataset_id = table.__table_args__["schema"]
jobs = dedent(
f"""
@ -117,6 +127,8 @@ def _(
`region-{conn_config.usageLocation}`.INFORMATION_SCHEMA.JOBS
WHERE
DATE(creation_time) >= CURRENT_DATE() - 1 AND
destination_table.dataset_id = '{dataset_id}' AND
destination_table.project_id = '{project_id}' AND
statement_type IN (
'{DatabaseDMLOperations.INSERT.value}',
'{DatabaseDMLOperations.DELETE.value}',
@ -133,12 +145,17 @@ def _(
"query_type,timestamp,destination_table,dml_statistics",
)
try:
if (
"query_results"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id]
):
# we'll try to get the cached data first
rows_jobs = kwargs["cache"][Dialects.BigQuery]["rows_jobs"]
except KeyError:
query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][
dataset_id
]["query_results"]
else:
cursor_jobs = session.execute(text(jobs))
rows_jobs = [
query_results = [
QueryResult(
row.statement_type,
row.start_time,
@ -147,15 +164,12 @@ def _(
)
for row in cursor_jobs.fetchall()
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery] = {"rows_jobs": rows_jobs}
SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id][
"query_results"
] = query_results
for row_jobs in rows_jobs:
if (
row_jobs.destination_table.get("project_id") == session.get_bind().url.host
and row_jobs.destination_table.get("dataset_id")
== table.__table_args__["schema"]
and row_jobs.destination_table.get("table_id") == table.__tablename__
):
for row_jobs in query_results:
if row_jobs.destination_table.get("table_id") == table.__tablename__:
rows_affected = None
try:
if row_jobs.query_type == DatabaseDMLOperations.INSERT.value:
@ -169,7 +183,7 @@ def _(
rows_affected = None
if row_jobs.query_type == DatabaseDMLOperations.MERGE.value:
for i, key in enumerate(row_jobs.dml_statistics):
for indx, key in enumerate(row_jobs.dml_statistics):
if row_jobs.dml_statistics[key] != 0:
metric_results.append(
{
@ -177,7 +191,7 @@ def _(
# We are padding timestamps by 0,1,2 millisesond to avoid
# duplicate timestamps
"timestamp": int(row_jobs.timestamp.timestamp() * 1000)
+ i,
+ indx,
"operation": dml_stat_to_dml_statement_mapping.get(key),
"rowsAffected": row_jobs.dml_statistics[key],
}
@ -214,6 +228,8 @@ def _(
List[Dict]:
"""
logger.debug(f"Fetching system metrics for {dialect}")
database = session.get_bind().url.database
schema = table.__table_args__["schema"]
stl_deleted = dedent(
f"""
@ -229,9 +245,8 @@ def _(
INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id
INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query
WHERE
sti."database" = '{session.get_bind().url.database}' AND
sti."schema" = '{table.__table_args__["schema"]}' AND
sti."table" = '{table.__tablename__}' AND
sti."database" = '{database}' AND
sti."schema" = '{schema}' AND
"rows" != 0 AND
DATE(starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
@ -253,9 +268,8 @@ def _(
INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id
INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query
WHERE
sti."database" = '{session.get_bind().url.database}' AND
sti."schema" = '{table.__table_args__["schema"]}' AND
sti."table" = '{table.__tablename__}' AND
sti."database" = '{database}' AND
sti."schema" = '{schema}' AND
"rows" != 0 AND
DATE(starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
@ -269,72 +283,98 @@ def _(
"database_name,schema_name,table_name,query_text,timestamp,rowsAffected",
)
cursor_insert = session.execute(text(stl_insert))
rows_insert = [
QueryResult(
row.database,
row.schema,
row.table,
sqlparse.parse(row.text)[0],
row.starttime,
row.rows,
)
for row in cursor_insert.fetchall()
]
if (
"query_results_inserted"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema]
):
# we'll try to get the cached data first
query_results_inserted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][
schema
]["query_results_inserted"]
else:
cursor_insert = session.execute(text(stl_insert))
query_results_inserted = [
QueryResult(
row.database,
row.schema,
row.table,
sqlparse.parse(clean_up_query(row.text))[0],
row.starttime,
row.rows,
)
for row in cursor_insert.fetchall()
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][
"query_results_inserted"
] = query_results_inserted
cursor_deleted = session.execute(text(stl_deleted))
rows_deleted = [
QueryResult(
row.database,
row.schema,
row.table,
sqlparse.parse(row.text)[0],
row.starttime,
row.rows,
)
for row in cursor_deleted.fetchall()
]
if (
"query_results_deleted"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema]
):
# we'll try to get the cached data first
query_results_deleted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][
schema
]["query_results_deleted"]
else:
cursor_deleted = session.execute(text(stl_deleted))
query_results_deleted = [
QueryResult(
row.database,
row.schema,
row.table,
sqlparse.parse(clean_up_query(row.text))[0],
row.starttime,
row.rows,
)
for row in cursor_deleted.fetchall()
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][
"query_results_deleted"
] = query_results_deleted
for row_insert in rows_insert:
query_text = row_insert.query_text
operation = next(
(
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML
and token.value.upper()
in DmlOperationType._member_names_ # pylint: disable=protected-access
),
None,
)
if operation:
metric_results.append(
{
"timestamp": int(row_insert.timestamp.timestamp() * 1000),
"operation": operation,
"rowsAffected": row_insert.rowsAffected,
}
for row_inserted in query_results_inserted:
if row_inserted.table_name == table.__tablename__:
query_text = row_inserted.query_text
operation = next(
(
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML
and token.value.upper()
in DmlOperationType._member_names_ # pylint: disable=protected-access
),
None,
)
if operation:
metric_results.append(
{
"timestamp": int(row_inserted.timestamp.timestamp() * 1000),
"operation": operation,
"rowsAffected": row_inserted.rowsAffected,
}
)
for row_deleted in query_results_deleted:
if row_deleted.table_name == table.__tablename__:
query_text = row_deleted.query_text
operation = next(
(
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE"
),
None,
)
for row_deleted in rows_deleted:
query_text = row_deleted.query_text
operation = next(
(
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE"
),
None,
)
if operation:
metric_results.append(
{
"timestamp": int(row_deleted.timestamp.timestamp() * 1000),
"operation": operation,
"rowsAffected": row_deleted.rowsAffected,
}
)
if operation:
metric_results.append(
{
"timestamp": int(row_deleted.timestamp.timestamp() * 1000),
"operation": operation,
"rowsAffected": row_deleted.rowsAffected,
}
)
return metric_results
@ -361,6 +401,8 @@ def _(
Dict: system metric
"""
logger.debug(f"Fetching system metrics for {dialect}")
database = session.get_bind().url.database
schema = table.__table_args__["schema"]
metric_results: List[Dict] = []
@ -381,63 +423,27 @@ def _(
FROM TABLE(RESULT_SCAN('{query_id}'));
"""
QueryResult = namedtuple(
"QueryResult",
"query_id,database_name,schema_name,query_text,query_type,timestamp",
)
try:
if (
"query_results"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema]
):
# we'll try to get the cached data first
rows = kwargs["cache"][Dialects.Snowflake]["rows"]
query_results = kwargs["cache"][Dialects.Snowflake]["query_results"]
except KeyError:
rows = session.execute(text(information_schema_query_history)).fetchall()
query_results = [
QueryResult(
row.query_id,
row.database_name.lower() if row.database_name else None,
row.schema_name.lower() if row.schema_name else None,
sqlparse.parse(row.query_text)[0],
row.query_type,
row.start_time,
)
for row in rows
query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
"query_results"
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake] = {
"rows": rows,
"query_results": query_results,
}
else:
rows = session.execute(text(information_schema_query_history))
query_results = []
for row in rows:
result = get_snowflake_system_queries(row, database, schema)
if result:
query_results.append(result)
SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
"query_results"
] = query_results
for query_result in query_results:
query_text = query_result.query_text
identifier = next(
(
query_el
for query_el in query_text.tokens
if isinstance(query_el, Identifier)
),
None,
)
if not identifier:
continue
values = identifier.value.split(".")
database_name, schema_name, table_name = ([None] * (3 - len(values))) + values
database_name = (
database_name.lower().strip('"')
if database_name
else query_result.database_name
)
schema_name = (
schema_name.lower().strip('"') if schema_name else query_result.schema_name
)
if (
session.get_bind().url.database.lower() == database_name
and table.__table_args__["schema"].lower() == schema_name
and table.__tablename__ == table_name
):
if table.__tablename__.lower() == query_result.table_name:
cursor_for_result_scan = session.execute(
text(dedent(result_scan.format(query_id=query_result.query_id)))
)
@ -483,6 +489,18 @@ class System(SystemMetric):
def name(cls):
return "system"
def _manage_cache(self, max_size_in_bytes: int = MAX_SIZE_IN_BYTES) -> None:
"""manage cache and clears it if it exceeds the max size
Args:
max_size_in_bytes (int, optional): max size of cache in bytes. Defaults to 2147483648.
Returns:
None
"""
if deep_size_of_dict(SYSTEM_QUERY_RESULT_CACHE) > max_size_in_bytes:
logger.debug("Clearing system cache")
SYSTEM_QUERY_RESULT_CACHE.clear()
def sql(self, session: Session, **kwargs):
"""Implements the SQL logic to fetch system data"""
if not hasattr(self, "table"):
@ -497,7 +515,6 @@ class System(SystemMetric):
session=session,
table=self.table,
conn_config=conn_config,
cache=SYSTEM_QUERY_RESULT_CACHE,
)
self._manage_cache()
return system_metrics

View File

@ -15,7 +15,9 @@ Helpers module for ingestion related methods
from __future__ import annotations
import itertools
import re
import sys
from datetime import datetime, timedelta
from functools import wraps
from math import floor, log
@ -352,3 +354,36 @@ def clean_uri(uri: str) -> str:
make it http://localhost:9000
"""
return uri[:-1] if uri.endswith("/") else uri
def deep_size_of_dict(obj: dict) -> int:
"""Get deepsize of dict data structure
Args:
obj (dict): dict data structure
Returns:
int: size of dict data structure
"""
# pylint: disable=unnecessary-lambda-assignment
dict_handler = lambda elmt: itertools.chain.from_iterable(elmt.items())
handlers = {
dict: dict_handler,
list: iter,
}
seen = set()
def sizeof(obj) -> int:
if id(obj) in seen:
return 0
seen.add(id(obj))
size = sys.getsizeof(obj, 0)
for type_, handler in handlers.items():
if isinstance(obj, type_):
size += sum(map(sizeof, handler(obj)))
break
return size
return sizeof(obj)

View File

@ -11,8 +11,14 @@
"""Profiler utils class and functions"""
import re
from collections import namedtuple
from typing import Optional
import sqlparse
from sqlalchemy.engine.row import Row
from sqlparse.sql import Identifier
from metadata.utils.sqa_utils import is_array
@ -44,3 +50,67 @@ class ColumnLike:
return cls(is_array(kwargs), kwargs.pop("array_col"))
except KeyError:
return cls(False, None)
def clean_up_query(query: str) -> str:
"""remove comments and newlines from query"""
return sqlparse.format(query, strip_comments=True).replace("\\n", "")
def get_snowflake_system_queries(
row: Row, database: str, schema: str
) -> Optional["QueryResult"]:
"""get snowflake system queries for a specific database and schema. Parsing the query
is the only reliable way to get the DDL operation as fields in the table are not.
Args:
row (dict): row from the snowflake system queries table
database (str): database name
schema (str): schema name
Returns:
QueryResult: namedtuple with the query result
"""
QueryResult = namedtuple(
"QueryResult",
"query_id,database_name,schema_name,table_name,query_text,query_type,timestamp",
)
try:
parsed_query = sqlparse.parse(clean_up_query(row.query_text))[0]
identifier = next(
(
query_el
for query_el in parsed_query.tokens
if isinstance(query_el, Identifier)
),
None,
)
if not identifier:
return None
values = identifier.value.split(".")
database_name, schema_name, table_name = ([None] * (3 - len(values))) + values
if not all([database_name, schema_name, table_name]):
return None
# clean up table name
table_name = re.sub(r"\s.*", "", table_name).strip()
if (
database.lower() == database_name.lower()
and schema.lower() == schema_name.lower()
):
return QueryResult(
row.query_id,
database_name.lower(),
schema_name.lower(),
table_name.lower(),
parsed_query,
row.query_type,
row.start_time,
)
except Exception:
return None
return None

View File

@ -209,6 +209,25 @@ class CliDBBase(TestCase):
sink_status,
)
@pytest.mark.order(12)
def test_system_metrics(self) -> None:
if not any([self.delete_queries(), self.update_queries()]):
pytest.skip(
"System metrics test requires delete and update table rows queries"
)
self.build_config_file()
self.run_command()
self.delete_table_and_view()
self.create_table_and_view()
self.build_config_file(
E2EType.PROFILER, {"includes": self.get_includes_schemas()}
)
self.delete_table_rows()
self.update_table_row()
result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_system_metrics(source_status, sink_status)
def retrieve_table(self, table_name_fqn: str) -> Table:
return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn)
@ -318,6 +337,22 @@ class CliDBBase(TestCase):
def get_profiler_time_partition_results() -> Optional[dict]:
return None
@staticmethod
def delete_queries() -> Optional[List[str]]:
return None
@staticmethod
def update_queries() -> Optional[List[str]]:
return None
@staticmethod
def delete_table_rows() -> None:
return None
@staticmethod
def update_table_row() -> None:
return None
@staticmethod
def get_test_type() -> str:
return "database"

View File

@ -14,11 +14,13 @@ Test database connectors which extend from `CommonDbSourceService` with CLI
"""
import json
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.api.workflow import Workflow
@ -124,6 +126,27 @@ class CliCommonDB:
== table_profile.get("rowCount")
)
def assert_for_system_metrics(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(sink_status.failures) == 0)
start_ts = int((datetime.now() - timedelta(days=1)).timestamp() * 1000)
end_ts = int((datetime.now() + timedelta(days=1)).timestamp() * 1000)
system_profile = self.openmetadata.get_profile_data(
self.fqn_deleted_table(),
start_ts,
end_ts,
profile_type=SystemProfile,
)
assert {profile.operation.value for profile in system_profile.entities} == {
"DELETE",
"INSERT",
"UPDATE",
}
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: SourceStatus, sink_status: SinkStatus
):

View File

@ -28,3 +28,15 @@ class SQACommonMethods:
connection.execute(self.drop_view_query)
connection.execute(self.drop_table_query)
connection.close()
def run_update_queries(self) -> None:
with self.engine.connect() as connection:
for update_query in self.update_queries():
connection.execute(update_query)
connection.close()
def run_delete_queries(self) -> None:
with self.engine.connect() as connection:
for drop_query in self.delete_queries():
connection.execute(drop_query)
connection.close()

View File

@ -51,6 +51,12 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def delete_table_and_view(self) -> None:
SQACommonMethods.delete_table_and_view(self)
def delete_table_rows(self) -> None:
SQACommonMethods.run_delete_queries(self)
def update_table_row(self) -> None:
SQACommonMethods.run_update_queries(self)
@staticmethod
def get_connector_name() -> str:
return "bigquery"
@ -104,3 +110,19 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod
def expected_filtered_mix() -> int:
return 1
@staticmethod
def delete_queries() -> List[str]:
return [
"""
DELETE FROM `open-metadata-beta.exclude_me`.orders WHERE id IN (1)
""",
]
@staticmethod
def update_queries() -> List[str]:
return [
"""
UPDATE `open-metadata-beta.exclude_me`.orders SET order_name = 'NINTENDO' WHERE id = 2
""",
]

View File

@ -68,6 +68,12 @@ class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def delete_table_and_view(self) -> None:
SQACommonMethods.delete_table_and_view(self)
def delete_table_rows(self) -> None:
SQACommonMethods.run_delete_queries(self)
def update_table_row(self) -> None:
SQACommonMethods.run_update_queries(self)
@staticmethod
def get_connector_name() -> str:
return "redshift"
@ -188,3 +194,19 @@ class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods):
}
],
}
@staticmethod
def delete_queries() -> List[str]:
return [
"""
DELETE FROM e2e_cli_tests.dbt_jaffle.persons WHERE person_id IN (1,2)
""",
]
@staticmethod
def update_queries() -> List[str]:
return [
"""
UPDATE e2e_cli_tests.dbt_jaffle.persons SET full_name = 'Bruce Wayne' WHERE person_id = 3
""",
]

View File

@ -21,9 +21,10 @@ from metadata.ingestion.api.source import SourceStatus
from .base.e2e_types import E2EType
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
class SnowflakeCliTest(CliCommonDB.TestSuite):
class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
"""
Snowflake CLI Tests
"""
@ -102,6 +103,12 @@ class SnowflakeCliTest(CliCommonDB.TestSuite):
connection.execute(self.drop_table_query)
connection.close()
def delete_table_rows(self) -> None:
SQACommonMethods.run_delete_queries(self)
def update_table_row(self) -> None:
SQACommonMethods.run_update_queries(self)
@pytest.mark.order(2)
def test_create_table_with_profiler(self) -> None:
# delete table in case it exists
@ -123,6 +130,21 @@ class SnowflakeCliTest(CliCommonDB.TestSuite):
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status)
@pytest.mark.order(12)
def test_system_metrics(self) -> None:
self.delete_table_and_view()
self.create_table_and_view()
self.build_config_file()
self.run_command()
self.build_config_file(
E2EType.PROFILER, {"includes": self.get_includes_schemas()}
)
self.delete_table_rows()
self.update_table_row()
result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_system_metrics(source_status, sink_status)
@staticmethod
def expected_tables() -> int:
return 7
@ -168,3 +190,19 @@ class SnowflakeCliTest(CliCommonDB.TestSuite):
@staticmethod
def expected_filtered_mix() -> int:
return 6
@staticmethod
def delete_queries() -> List[str]:
return [
"""
DELETE FROM E2E_DB.E2E_TEST.PERSONS WHERE full_name = 'Peter Parker'
""",
]
@staticmethod
def update_queries() -> List[str]:
return [
"""
UPDATE E2E_DB.E2E_TEST.PERSONS SET full_name = 'Bruce Wayne' WHERE full_name = 'Clark Kent'
""",
]

View File

@ -55,3 +55,17 @@ def base_table():
),
],
)
class Row:
def __init__(
self,
query_id,
query_type,
start_time,
query_text,
):
self.query_id = query_id
self.query_type = query_type
self.start_time = start_time
self.query_text = query_text

View File

@ -14,6 +14,7 @@ Tests utils function for the profiler
"""
import os
from datetime import datetime
from unittest import TestCase
from sqlalchemy import Column, create_engine
@ -21,9 +22,11 @@ from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.sql.sqltypes import Integer, String
from metadata.profiler.metrics.hybrid.histogram import Histogram
from metadata.utils.profiler_utils import ColumnLike
from metadata.utils.profiler_utils import ColumnLike, get_snowflake_system_queries
from metadata.utils.sqa_utils import handle_array, is_array
from .conftest import Row
Base = declarative_base()
@ -137,3 +140,32 @@ def test_column_like_object():
assert not kwargs
assert column_like._is_array is False
assert column_like._array_col is None
def test_get_snowflake_system_queries():
"""Test get snowflake system queries"""
row = Row(
query_id=1,
query_type="INSERT",
start_time=datetime.now(),
query_text="INSERT INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')",
)
query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore
assert query_result
assert query_result.query_id == 1
assert query_result.query_type == "INSERT"
assert query_result.database_name == "database"
assert query_result.schema_name == "schema"
assert query_result.table_name == "table1"
row = Row(
query_id=1,
query_type="INSERT",
start_time=datetime.now(),
query_text="INSERT INTO SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')",
)
query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore
assert not query_result

View File

@ -23,6 +23,7 @@ from metadata.generated.schema.type.tagLabel import (
)
from metadata.utils.helpers import (
clean_up_starting_ending_double_quotes_in_string,
deep_size_of_dict,
get_entity_tier_from_tags,
list_to_dict,
)
@ -85,3 +86,13 @@ class TestHelpers(TestCase):
)
assert get_entity_tier_from_tags(table_entity_wo_tier.tags) is None
def test_deep_size_of_dict(self):
"""test deep size of dict"""
test_dict = {
"a": 1,
"b": {"c": 2, "d": {"e": "Hello World", "f": [4, 5, 6]}},
}
assert deep_size_of_dict(test_dict) >= 1000
assert deep_size_of_dict(test_dict) <= 1500

View File

@ -2750,6 +2750,21 @@ public interface CollectionDAO {
@Bind("json") String json,
@Bind("timestamp") Long timestamp);
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set json = :json where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set json = (:json :: jsonb) where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp and json #>>'{operation}' = :operation",
connectionType = POSTGRES)
void updateExtensionByOperation(
@Bind("entityFQN") String entityFQN,
@Bind("extension") String extension,
@Bind("json") String json,
@Bind("timestamp") Long timestamp,
@Bind("operation") String operation);
@SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension")
String getExtension(@Bind("entityFQN") String entityId, @Bind("extension") String extension);
@ -2793,6 +2808,20 @@ public interface CollectionDAO {
String getExtensionAtTimestamp(
@Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("timestamp") long timestamp);
@ConnectionAwareSqlQuery(
value =
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp AND json -> '$.operation' = :operation",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension AND timestamp = :timestamp AND json #>>'{operation}' = :operation",
connectionType = POSTGRES)
String getExtensionAtTimestampWithOperation(
@Bind("entityFQN") String entityFQN,
@Bind("extension") String extension,
@Bind("timestamp") long timestamp,
@Bind("operation") String operation);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension "
+ "ORDER BY timestamp DESC LIMIT 1")

View File

@ -383,17 +383,21 @@ public class TableRepository extends EntityRepository<Table> {
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getExtensionAtTimestamp(
table.getFullyQualifiedName(), SYSTEM_PROFILE_EXTENSION, systemProfile.getTimestamp()),
.getExtensionAtTimestampWithOperation(
table.getFullyQualifiedName(),
SYSTEM_PROFILE_EXTENSION,
systemProfile.getTimestamp(),
systemProfile.getOperation().value()),
SystemProfile.class);
if (storedSystemProfile != null) {
daoCollection
.entityExtensionTimeSeriesDao()
.update(
.updateExtensionByOperation(
table.getFullyQualifiedName(),
SYSTEM_PROFILE_EXTENSION,
JsonUtils.pojoToJson(systemProfile),
storedSystemProfile.getTimestamp());
storedSystemProfile.getTimestamp(),
storedSystemProfile.getOperation().value());
} else {
daoCollection
.entityExtensionTimeSeriesDao()