Fixes #11743 - Remove SQLParse dependency for System Metrics (#12072)

* fix: removed sqlparse dependency for system metrics

* fix: update sample query

* fix: move system test os retrieval to `.get()`

* fix: move os.environ to `get`
This commit is contained in:
Teddy 2023-06-22 06:51:24 +02:00 committed by GitHub
parent 6e92dc6938
commit 1e86b6533c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1150 additions and 365 deletions

View File

@ -228,7 +228,7 @@ class SystemMetric(Metric, ABC):
"""Abstract class for system metrics"""
@abstractmethod
def sql(self):
def sql(self, session: Session, **kwargs):
"""SQL query to get system Metric"""

View File

@ -0,0 +1,35 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DML Operation class and mapper
"""
from enum import Enum
from metadata.generated.schema.entity.data.table import DmlOperationType
class DatabaseDMLOperations(Enum):
"""enum of supported DML operation on database engine side"""
INSERT = "INSERT"
UPDATE = "UPDATE"
DELETE = "DELETE"
MERGE = "MERGE"
DML_OPERATION_MAP = {
DatabaseDMLOperations.INSERT.value: DmlOperationType.INSERT.value,
DatabaseDMLOperations.MERGE.value: DmlOperationType.UPDATE.value,
DatabaseDMLOperations.UPDATE.value: DmlOperationType.UPDATE.value,
DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value,
}

View File

@ -0,0 +1,54 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Bigquery System Metric Queries
"""
from datetime import datetime
from pydantic import BaseModel
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
class BigQueryQueryResult(BaseModel):
table_name: dict
timestamp: datetime
query_type: str
dml_statistics: dict
DML_STAT_TO_DML_STATEMENT_MAPPING = {
"inserted_row_count": DatabaseDMLOperations.INSERT.value,
"deleted_row_count": DatabaseDMLOperations.DELETE.value,
"updated_row_count": DatabaseDMLOperations.UPDATE.value,
}
JOBS = """
SELECT
statement_type,
start_time,
destination_table,
dml_statistics
FROM
`region-{usage_location}`.INFORMATION_SCHEMA.JOBS
WHERE
DATE(creation_time) >= CURRENT_DATE() - 1 AND
destination_table.dataset_id = '{dataset_id}' AND
destination_table.project_id = '{project_id}' AND
statement_type IN (
'{insert}',
'{update}',
'{delete}',
'{merge}'
)
ORDER BY creation_time DESC;
"""

View File

@ -0,0 +1,105 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Redshift System Metric Queries and queries operations
"""
from typing import List
from sqlalchemy import text
from sqlalchemy.orm import Session
from metadata.utils.profiler_utils import QueryResult
STL_QUERY = """
with data as (
select
{alias}.*
from
pg_catalog.stl_insert si
{join_type} join pg_catalog.stl_delete sd on si.query = sd.query
where
{condition}
)
SELECT
SUM(data."rows") AS "rows",
sti."database",
sti."schema",
sti."table",
sq.text,
DATE_TRUNC('second', data.starttime) AS starttime
FROM
data
INNER JOIN pg_catalog.svv_table_info sti ON data.tbl = sti.table_id
INNER JOIN pg_catalog.stl_querytext sq ON data.query = sq.query
where
sti."database" = '{database}' AND
sti."schema" = '{schema}' AND
"rows" != 0 AND
DATE(data.starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
ORDER BY 6 DESC
"""
def get_query_results(
session: Session,
query,
operation,
) -> List[QueryResult]:
"""get query results either from cache or from the database
Args:
cache (Optional[List[QueryResult]]): cache results
session (Session): session
query (_type_): query
operation (_type_): operation
Returns:
List[QueryResult]:
"""
cursor = session.execute(text(query))
results = [
QueryResult(
database_name=row.database,
schema_name=row.schema,
table_name=row.table,
query_text=row.text,
query_type=operation,
timestamp=row.starttime,
rows=row.rows,
)
for row in cursor
]
return results
def get_metric_result(ddls: List[QueryResult], table_name: str) -> List:
"""Given query results, retur the metric result
Args:
ddls (List[QueryResult]): list of query results
table_name (str): table name
Returns:
List:
"""
return [
{
"timestamp": int(ddl.timestamp.timestamp() * 1000),
"operation": ddl.query_type,
"rowsAffected": ddl.rows,
}
for ddl in ddls
if ddl.table_name == table_name
]

View File

@ -0,0 +1,99 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Snowflake System Metric Queries and query operations
"""
import re
from typing import Optional
from sqlalchemy.engine.row import Row
from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import QueryResult, get_identifiers_from_string
logger = profiler_logger()
INFORMATION_SCHEMA_QUERY = """
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
WHERE
start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP)
AND QUERY_TEXT ILIKE '%{tablename}%'
AND QUERY_TYPE IN (
'{insert}',
'{update}',
'{delete}',
'{merge}'
)
AND EXECUTION_STATUS = 'SUCCESS';
"""
RESULT_SCAN = """
SELECT *
FROM TABLE(RESULT_SCAN('{query_id}'));
"""
def get_snowflake_system_queries(
row: Row, database: str, schema: str
) -> Optional[QueryResult]:
"""get snowflake system queries for a specific database and schema. Parsing the query
is the only reliable way to get the DDL operation as fields in the table are not. If parsing
fails we'll fall back to regex lookup
1. Parse the query and check if we have an Identifier
2.
Args:
row (dict): row from the snowflake system queries table
database (str): database name
schema (str): schema name
Returns:
QueryResult: namedtuple with the query result
"""
try:
logger.debug(f"Trying to parse query:\n{row.QUERY_TEXT}\n")
pattern = r"(?:(INSERT\s*INTO\s*|INSERT\s*OVERWRITE\s*INTO\s*|UPDATE\s*|MERGE\s*INTO\s*|DELETE\s*FROM\s*))([\w._\"]+)(?=[\s*\n])" # pylint: disable=line-too-long
match = re.match(pattern, row.QUERY_TEXT, re.IGNORECASE)
try:
identifier = match.group(2)
except (IndexError, AttributeError):
logger.debug("Could not find identifier in query. Skipping row.")
return None
database_name, schema_name, table_name = get_identifiers_from_string(identifier)
if not all([database_name, schema_name, table_name]):
logger.debug(
"Missing database, schema, or table. Can't link operation to table entity in OpenMetadata."
)
return None
if (
database.lower() == database_name.lower()
and schema.lower() == schema_name.lower()
):
return QueryResult(
query_id=row.QUERY_ID,
database_name=database_name.lower(),
schema_name=schema_name.lower(),
table_name=table_name.lower(),
query_text=row.QUERY_TEXT,
query_type=row.QUERY_TYPE,
timestamp=row.START_TIME,
)
except Exception:
return None
return None

View File

@ -8,31 +8,47 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=unused-argument
"""
System Metric
"""
import traceback
from collections import defaultdict, namedtuple
from enum import Enum
from collections import defaultdict
from textwrap import dedent
from typing import Dict, List, Optional
import sqlparse
from sqlalchemy import text
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.generated.schema.entity.data.table import DmlOperationType
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.profiler.metrics.core import SystemMetric
from metadata.profiler.metrics.system.dml_operation import (
DML_OPERATION_MAP,
DatabaseDMLOperations,
)
from metadata.profiler.metrics.system.queries.bigquery import (
DML_STAT_TO_DML_STATEMENT_MAPPING,
JOBS,
BigQueryQueryResult,
)
from metadata.profiler.metrics.system.queries.redshift import (
STL_QUERY,
get_metric_result,
get_query_results,
)
from metadata.profiler.metrics.system.queries.snowflake import (
INFORMATION_SCHEMA_QUERY,
RESULT_SCAN,
get_snowflake_system_queries,
)
from metadata.profiler.orm.registry import Dialects
from metadata.utils.dispatch import valuedispatch
from metadata.utils.helpers import deep_size_of_dict
from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import clean_up_query, get_snowflake_system_queries
from metadata.utils.profiler_utils import get_value_from_cache, set_cache
logger = profiler_logger()
@ -44,22 +60,6 @@ def recursive_dic():
return defaultdict(recursive_dic)
class DatabaseDMLOperations(Enum):
"""enum of supported DML operation on database engine side"""
INSERT = "INSERT"
UPDATE = "UPDATE"
DELETE = "DELETE"
MERGE = "MERGE"
DML_OPERATION_MAP = {
DatabaseDMLOperations.INSERT.value: DmlOperationType.INSERT.value,
DatabaseDMLOperations.MERGE.value: DmlOperationType.UPDATE.value,
DatabaseDMLOperations.UPDATE.value: DmlOperationType.UPDATE.value,
DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value,
}
SYSTEM_QUERY_RESULT_CACHE = recursive_dic()
@ -108,100 +108,83 @@ def _(
List[Dict]:
"""
logger.debug(f"Fetching system metrics for {dialect}")
dml_stat_to_dml_statement_mapping = {
"inserted_row_count": DatabaseDMLOperations.INSERT.value,
"deleted_row_count": DatabaseDMLOperations.DELETE.value,
"updated_row_count": DatabaseDMLOperations.UPDATE.value,
}
project_id = session.get_bind().url.host
dataset_id = table.__table_args__["schema"]
jobs = dedent(
f"""
SELECT
statement_type,
start_time,
destination_table,
dml_statistics
FROM
`region-{conn_config.usageLocation}`.INFORMATION_SCHEMA.JOBS
WHERE
DATE(creation_time) >= CURRENT_DATE() - 1 AND
destination_table.dataset_id = '{dataset_id}' AND
destination_table.project_id = '{project_id}' AND
statement_type IN (
'{DatabaseDMLOperations.INSERT.value}',
'{DatabaseDMLOperations.DELETE.value}',
'{DatabaseDMLOperations.UPDATE.value}',
'{DatabaseDMLOperations.MERGE.value}'
)
ORDER BY creation_time DESC;
"""
)
project_id = session.get_bind().url.host
dataset_id = table.__table_args__["schema"] # type: ignore
metric_results: List[Dict] = []
QueryResult = namedtuple(
"QueryResult",
"query_type,timestamp,destination_table,dml_statistics",
# QueryResult = namedtuple(
# "QueryResult",
# "query_type,timestamp,destination_table,dml_statistics",
# )
jobs = get_value_from_cache(
SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs"
)
if (
"query_results"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id]
):
# we'll try to get the cached data first
query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][
dataset_id
]["query_results"]
else:
cursor_jobs = session.execute(text(jobs))
query_results = [
QueryResult(
row.statement_type,
row.start_time,
row.destination_table,
row.dml_statistics,
if not jobs:
cursor_jobs = session.execute(
text(
JOBS.format(
usage_location=conn_config.usageLocation,
dataset_id=dataset_id,
project_id=project_id,
insert=DatabaseDMLOperations.INSERT.value,
update=DatabaseDMLOperations.UPDATE.value,
delete=DatabaseDMLOperations.DELETE.value,
merge=DatabaseDMLOperations.MERGE.value,
)
)
for row in cursor_jobs.fetchall()
)
jobs = [
BigQueryQueryResult(
query_type=row.statement_type,
timestamp=row.start_time,
table_name=row.destination_table,
dml_statistics=row.dml_statistics,
)
for row in cursor_jobs
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery][project_id][dataset_id][
"query_results"
] = query_results
set_cache(
SYSTEM_QUERY_RESULT_CACHE,
f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs",
jobs,
)
for row_jobs in query_results:
if row_jobs.destination_table.get("table_id") == table.__tablename__:
for job in jobs:
if job.table_name.get("table_id") == table.__tablename__: # type: ignore
rows_affected = None
try:
if row_jobs.query_type == DatabaseDMLOperations.INSERT.value:
rows_affected = row_jobs.dml_statistics.get("inserted_row_count")
if row_jobs.query_type == DatabaseDMLOperations.DELETE.value:
rows_affected = row_jobs.dml_statistics.get("deleted_row_count")
if row_jobs.query_type == DatabaseDMLOperations.UPDATE.value:
rows_affected = row_jobs.dml_statistics.get("updated_row_count")
if job.query_type == DatabaseDMLOperations.INSERT.value:
rows_affected = job.dml_statistics.get("inserted_row_count")
if job.query_type == DatabaseDMLOperations.DELETE.value:
rows_affected = job.dml_statistics.get("deleted_row_count")
if job.query_type == DatabaseDMLOperations.UPDATE.value:
rows_affected = job.dml_statistics.get("updated_row_count")
except AttributeError:
logger.debug(traceback.format_exc())
rows_affected = None
if row_jobs.query_type == DatabaseDMLOperations.MERGE.value:
for indx, key in enumerate(row_jobs.dml_statistics):
if row_jobs.dml_statistics[key] != 0:
if job.query_type == DatabaseDMLOperations.MERGE.value:
for indx, key in enumerate(job.dml_statistics):
if job.dml_statistics[key] != 0:
metric_results.append(
{
# Merge statement can include multiple DML operations
# We are padding timestamps by 0,1,2 millisesond to avoid
# duplicate timestamps
"timestamp": int(row_jobs.timestamp.timestamp() * 1000)
"timestamp": int(job.timestamp.timestamp() * 1000)
+ indx,
"operation": dml_stat_to_dml_statement_mapping.get(key),
"rowsAffected": row_jobs.dml_statistics[key],
"operation": DML_STAT_TO_DML_STATEMENT_MAPPING.get(key),
"rowsAffected": job.dml_statistics[key],
}
)
continue
metric_results.append(
{
"timestamp": int(row_jobs.timestamp.timestamp() * 1000),
"operation": row_jobs.query_type,
"timestamp": int(job.timestamp.timestamp() * 1000),
"operation": job.query_type,
"rowsAffected": rows_affected,
}
)
@ -229,237 +212,147 @@ def _(
"""
logger.debug(f"Fetching system metrics for {dialect}")
database = session.get_bind().url.database
schema = table.__table_args__["schema"]
stl_deleted = dedent(
f"""
SELECT
SUM(si."rows") AS "rows",
sti."database",
sti."schema",
sti."table",
sq.text,
DATE_TRUNC('second', si.starttime) AS starttime
FROM
pg_catalog.stl_delete si
INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id
INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query
WHERE
sti."database" = '{database}' AND
sti."schema" = '{schema}' AND
"rows" != 0 AND
DATE(starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
ORDER BY 6 desc
"""
)
stl_insert = dedent(
f"""
SELECT
SUM(si."rows") AS "rows",
sti."database",
sti."schema",
sti."table",
sq.text,
DATE_TRUNC('second', si.starttime) AS starttime
FROM
pg_catalog.stl_insert si
INNER JOIN pg_catalog.svv_table_info sti ON si.tbl = sti.table_id
INNER JOIN pg_catalog.stl_querytext sq ON si.query = sq.query
WHERE
sti."database" = '{database}' AND
sti."schema" = '{schema}' AND
"rows" != 0 AND
DATE(starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
ORDER BY 6 desc
"""
)
schema = table.__table_args__["schema"] # type: ignore
metric_results: List[Dict] = []
QueryResult = namedtuple(
"QueryResult",
"database_name,schema_name,table_name,query_text,timestamp,rowsAffected",
# get inserts ddl queries
inserts = get_value_from_cache(
SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.inserts"
)
if not inserts:
insert_query = STL_QUERY.format(
alias="si",
join_type="LEFT",
condition="sd.query is null",
database=database,
schema=schema,
)
inserts = get_query_results(
session,
insert_query,
DatabaseDMLOperations.INSERT.value,
)
set_cache(
SYSTEM_QUERY_RESULT_CACHE,
f"{Dialects.Redshift}.{database}.{schema}.inserts",
inserts,
)
metric_results.extend(get_metric_result(inserts, table.__tablename__)) # type: ignore
if (
"query_results_inserted"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema]
):
# we'll try to get the cached data first
query_results_inserted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][
schema
]["query_results_inserted"]
else:
cursor_insert = session.execute(text(stl_insert))
query_results_inserted = [
QueryResult(
row.database,
row.schema,
row.table,
sqlparse.parse(clean_up_query(row.text))[0],
row.starttime,
row.rows,
)
for row in cursor_insert.fetchall()
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][
"query_results_inserted"
] = query_results_inserted
# get deletes ddl queries
deletes = get_value_from_cache(
SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.deletes"
)
if not deletes:
delete_query = STL_QUERY.format(
alias="sd",
join_type="RIGHT",
condition="si.query is null",
database=database,
schema=schema,
)
deletes = get_query_results(
session,
delete_query,
DatabaseDMLOperations.DELETE.value,
)
set_cache(
SYSTEM_QUERY_RESULT_CACHE,
f"{Dialects.Redshift}.{database}.{schema}.deletes",
deletes,
)
metric_results.extend(get_metric_result(deletes, table.__tablename__)) # type: ignore
if (
"query_results_deleted"
in SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema]
):
# we'll try to get the cached data first
query_results_deleted = SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][
schema
]["query_results_deleted"]
else:
cursor_deleted = session.execute(text(stl_deleted))
query_results_deleted = [
QueryResult(
row.database,
row.schema,
row.table,
sqlparse.parse(clean_up_query(row.text))[0],
row.starttime,
row.rows,
)
for row in cursor_deleted.fetchall()
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.Redshift][database][schema][
"query_results_deleted"
] = query_results_deleted
for row_inserted in query_results_inserted:
if row_inserted.table_name == table.__tablename__:
query_text = row_inserted.query_text
operation = next(
(
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML
and token.value.upper()
in DmlOperationType._member_names_ # pylint: disable=protected-access
),
None,
)
if operation:
metric_results.append(
{
"timestamp": int(row_inserted.timestamp.timestamp() * 1000),
"operation": operation,
"rowsAffected": row_inserted.rowsAffected,
}
)
for row_deleted in query_results_deleted:
if row_deleted.table_name == table.__tablename__:
query_text = row_deleted.query_text
operation = next(
(
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE"
),
None,
)
if operation:
metric_results.append(
{
"timestamp": int(row_deleted.timestamp.timestamp() * 1000),
"operation": operation,
"rowsAffected": row_deleted.rowsAffected,
}
)
# get updates ddl queries
updates = get_value_from_cache(
SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.Redshift}.{database}.{schema}.updates"
)
if not updates:
update_query = STL_QUERY.format(
alias="si",
join_type="INNER",
condition="sd.query is not null",
database=database,
schema=schema,
)
updates = get_query_results(
session,
update_query,
DatabaseDMLOperations.UPDATE.value,
)
set_cache(
SYSTEM_QUERY_RESULT_CACHE,
f"{Dialects.Redshift}.{database}.{schema}.updates",
updates,
)
metric_results.extend(get_metric_result(updates, table.__tablename__)) # type: ignore
return metric_results
# @get_system_metrics_for_dialect.register(Dialects.Snowflake)
# def _(
# dialect: str,
# session: Session,
# table: DeclarativeMeta,
# *args,
# **kwargs,
# ) -> Optional[List[Dict]]:
# """Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request.
# We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types
# (INSERTS, MERGE, DELETE, UPDATE).
@get_system_metrics_for_dialect.register(Dialects.Snowflake)
def _(
dialect: str,
session: Session,
table: DeclarativeMeta,
*args,
**kwargs,
) -> Optional[List[Dict]]:
"""Fetch system metrics for Snowflake. query_history will return maximum 10K rows in one request.
We'll be fetching all the queries ran for the past 24 hours and filtered on specific query types
(INSERTS, MERGE, DELETE, UPDATE).
# To get the number of rows affected we'll use the specific query ID.
:waring: Unlike redshift and bigquery results are not cached as we'll be looking
at DDL for each table
# Args:
# dialect (str): dialect
# session (Session): session object
To get the number of rows affected we'll use the specific query ID.
# Returns:
# Dict: system metric
# """
# logger.debug(f"Fetching system metrics for {dialect}")
# database = session.get_bind().url.database
# schema = table.__table_args__["schema"]
Args:
dialect (str): dialect
session (Session): session object
# metric_results: List[Dict] = []
Returns:
Dict: system metric
"""
logger.debug(f"Fetching system metrics for {dialect}")
database = session.get_bind().url.database
schema = table.__table_args__["schema"] # type: ignore
# information_schema_query_history = f"""
# SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
# WHERE
# start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP)
# AND QUERY_TYPE IN (
# '{DatabaseDMLOperations.INSERT.value}',
# '{DatabaseDMLOperations.UPDATE.value}',
# '{DatabaseDMLOperations.DELETE.value}',
# '{DatabaseDMLOperations.MERGE.value}'
# )
# AND EXECUTION_STATUS = 'SUCCESS';
# """
# result_scan = """
# SELECT *
# FROM TABLE(RESULT_SCAN('{query_id}'));
# """
metric_results: List[Dict] = []
# if (
# "query_results"
# in SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema]
# ):
# # we'll try to get the cached data first
# query_results = SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
# "query_results"
# ]
# else:
# rows = session.execute(text(information_schema_query_history))
# query_results = []
# for row in rows:
# result = get_snowflake_system_queries(row, database, schema)
# if result:
# query_results.append(result)
# SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake][database][schema][
# "query_results"
# ] = query_results
rows = session.execute(
text(
INFORMATION_SCHEMA_QUERY.format(
tablename=table.__tablename__, # type: ignore
insert=DatabaseDMLOperations.INSERT.value,
update=DatabaseDMLOperations.UPDATE.value,
delete=DatabaseDMLOperations.DELETE.value,
merge=DatabaseDMLOperations.MERGE.value,
)
)
)
query_results = []
for row in rows:
result = get_snowflake_system_queries(row, database, schema)
if result:
query_results.append(result)
# for query_result in query_results:
# if table.__tablename__.lower() == query_result.table_name:
# cursor_for_result_scan = session.execute(
# text(dedent(result_scan.format(query_id=query_result.query_id)))
# )
# row_for_result_scan = cursor_for_result_scan.first()
for query_result in query_results:
cursor_for_result_scan = session.execute(
text(dedent(RESULT_SCAN.format(query_id=query_result.query_id)))
)
row_for_result_scan = cursor_for_result_scan.first()
# metric_results.append(
# {
# "timestamp": int(query_result.timestamp.timestamp() * 1000),
# "operation": DML_OPERATION_MAP.get(query_result.query_type),
# "rowsAffected": row_for_result_scan[0]
# if row_for_result_scan
# else None,
# }
# )
metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DML_OPERATION_MAP.get(query_result.query_type),
"rowsAffected": row_for_result_scan[0] if row_for_result_scan else None,
}
)
# return metric_results
return metric_results
class System(SystemMetric):
@ -513,7 +406,7 @@ class System(SystemMetric):
system_metrics = get_system_metrics_for_dialect(
session.get_bind().dialect.name,
session=session,
table=self.table,
table=self.table, # pylint: disable=no-member
conn_config=conn_config,
)
self._manage_cache()

View File

@ -12,15 +12,34 @@
"""Profiler utils class and functions"""
import re
from collections import namedtuple
from typing import Optional
from collections import defaultdict
from datetime import datetime
from functools import reduce
from typing import Optional, Tuple
import sqlparse
from sqlalchemy.engine.row import Row
from sqlparse.sql import Identifier
from pydantic import BaseModel
from metadata.utils.logger import profiler_logger
from metadata.utils.sqa_utils import is_array
logger = profiler_logger()
PARSING_TIMEOUT = 10
class QueryResult(BaseModel):
"""System metric query result shared by Redshift and Snowflake"""
database_name: str
schema_name: str
table_name: str
query_type: str
timestamp: datetime
query_id: Optional[str] = None
query_text: Optional[str] = None
rows: Optional[int] = None
class ColumnLike:
"""We don't have column information at this stage (only metric entities)
@ -57,60 +76,58 @@ def clean_up_query(query: str) -> str:
return sqlparse.format(query, strip_comments=True).replace("\\n", "")
def get_snowflake_system_queries(
row: Row, database: str, schema: str
) -> Optional["QueryResult"]:
"""get snowflake system queries for a specific database and schema. Parsing the query
is the only reliable way to get the DDL operation as fields in the table are not.
def get_identifiers_from_string(
identifier: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""given a string identifier try to fetch the database, schema and table names.
part of the identifier name as `"DATABASE.DOT"` will be returned on the left side of the tuple
and the rest of the identifier name as `"SCHEMA.DOT.TABLE"` will be returned on the right side of the tuple
Args:
row (dict): row from the snowflake system queries table
database (str): database name
schema (str): schema name
identifier (str): table identifier
Returns:
QueryResult: namedtuple with the query result
Tuple[str, str, str]: database, schema and table names
"""
pattern = r"\"([^\"]+)\"|(\w+(?:\.\w+)*(?:\.\w+)*)"
matches = re.findall(pattern, identifier)
QueryResult = namedtuple(
"QueryResult",
"query_id,database_name,schema_name,table_name,query_text,query_type,timestamp",
)
values = []
for match in matches:
if match[0] != "":
values.append(match[0])
if match[1] != "":
split_match = match[1].split(".")
values.extend(split_match)
database_name, schema_name, table_name = ([None] * (3 - len(values))) + values
return database_name, schema_name, table_name
def get_value_from_cache(cache: dict, key: str):
"""given a dict of cache and a key, return the value if exists
Args:
cache (dict): dict of cache
key (str): key to look for in the cache
"""
try:
parsed_query = sqlparse.parse(clean_up_query(row.query_text))[0]
identifier = next(
(
query_el
for query_el in parsed_query.tokens
if isinstance(query_el, Identifier)
),
None,
)
if not identifier:
return None
values = identifier.value.split(".")
database_name, schema_name, table_name = ([None] * (3 - len(values))) + values
if not all([database_name, schema_name, table_name]):
return None
# clean up table name
table_name = re.sub(r"\s.*", "", table_name).strip()
if (
database.lower() == database_name.lower()
and schema.lower() == schema_name.lower()
):
return QueryResult(
row.query_id,
database_name.lower(),
schema_name.lower(),
table_name.lower(),
parsed_query,
row.query_type,
row.start_time,
)
except Exception:
return reduce(dict.get, key.split("."), cache)
except TypeError:
return None
return None
def set_cache(cache: defaultdict, key: str, value):
"""given a dict of cache, a key and a value, set the value in the cache
Args:
cache (dict): dict of cache
key (str): key to set for in the cache
value: value to set in the cache
"""
split_key = key.split(".")
for indx, key_ in enumerate(split_key):
if indx == len(split_key) - 1:
cache[key_] = value
break
cache = cache[key_]

View File

@ -0,0 +1,179 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate bigquery system metrics (will be disabled by default). To be ran manually
How to use this test
--------------------
1. Comment the @pytest.mark.skip decorator on line 31
2. Make sure you have set up the right environment variables for the bigquery database
check the config file at "cli_e2e/database/bigquery/bigquery.yaml". The metadata
ingestion will ingest data from the `dbt_jaffle` schema.
3. Prior to running this test you will need to execute DDLs in the `dbt_jaffle` schema.
We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema.
query example:
```
INSERT INTO dbt_jaffle.Person VALUES
('John', 'Doe', 'II'),
('Jane', 'Doe', 'II'),
('Jeff', 'Doe', 'II')
UPDATE dbt_jaffle.Person SET add = 'IV' WHERE first_name = 'John';
MERGE INTO dbt_jaffle.Person NT USING (SELECT 'Jeff' AS first_name, 'Doe' AS last_name, NULL AS add) N ON NT.first_name = N.first_name
WHEN MATCHED THEN UPDATE SET NT.first_name = N.first_name;
DELETE FROM dbt_jaffle.Person WHERE first_name = 'John';
```
4. Once you have performed the above steps, run the test with the following command:
`python -m pytest tests/integration/orm_profiler/system/test_bigquery_system_metrics.py` from the ingestion directory.
You can also perform the same action with your IDE.
:warning: the profiler workflow will be ran for the table set in `PROFILER_TABLE_FILTER_PATTERN`
"""
import os
import pathlib
from copy import deepcopy
from unittest import TestCase
import pytest
import yaml
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
BIGQUERY_CONFIG_FILE = "cli_e2e/database/bigquery/bigquery.yaml"
FULL_CONFIG_PATH = pathlib.Path(TESTS_ROOT_DIR, BIGQUERY_CONFIG_FILE)
DATABASE_FILTER = {
"includes": os.environ.get("E2E_BQ_PROJECT_ID"),
"excludes": None,
}
SCHEMA_FILTER = {
"includes": "dbt_jaffle",
"excludes": None,
}
TABLE_FILTER = {
"includes": "Person",
"excludes": None,
}
@pytest.mark.skip(
reason="Disabled by default. Should be ran manually on system metric updates"
)
class TestBigquerySystem(TestCase):
"""Test class for bigquery system metrics"""
taxonomy = os.environ.get("E2E_BQ_PROJECT_ID_TAXONOMY")
private_key_id = os.environ.get("E2E_BQ_PRIVATE_KEY_ID")
private_key = os.environ.get("E2E_BQ_PRIVATE_KEY")
project_id = DATABASE_FILTER["includes"]
client_email = os.environ.get("E2E_BQ_CLIENT_EMAIL")
client_id = os.environ.get("E2E_BQ_CLIENT_ID")
full_config_path = FULL_CONFIG_PATH
schema = SCHEMA_FILTER["includes"]
table = TABLE_FILTER["includes"]
@classmethod
def setUpClass(cls) -> None:
"""set up class"""
with open(cls.full_config_path, "r", encoding="utf-8") as file:
cls.config = yaml.safe_load(file)
# set up the config to filter from the `dbt_jaffle` schema
cls.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"includes": [cls.schema],
}
cls.config["source"]["sourceConfig"]["config"]["tableFilterPattern"] = {
"includes": [cls.table],
}
cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][
"projectId"
] = cls.project_id
cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][
"privateKeyId"
] = cls.private_key_id
cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][
"privateKey"
] = cls.private_key
cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][
"clientEmail"
] = cls.client_email
cls.config["source"]["serviceConnection"]["config"]["credentials"]["gcpConfig"][
"clientId"
] = cls.client_id
cls.config["source"]["serviceConnection"]["config"]["taxonomyProjectID"] = [
cls.taxonomy
]
# set metadata config
cls.metadata_config_dict = cls.config["workflowConfig"][
"openMetadataServerConfig"
]
cls.metadata_config = OpenMetadataConnection.parse_obj(cls.metadata_config_dict)
cls.metadata = OpenMetadata(cls.metadata_config)
# run the ingestion workflow
ingestion_workflow = Workflow.create(cls.config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.print_status()
ingestion_workflow.stop()
# get table fqn
cls.table_fqn = f"{cls.config['source']['serviceName']}.{cls.project_id}.{cls.schema}.{cls.table}"
def test_bigquery_system_metrics(self):
"""run profiler workflow and check the system metrics"""
config = deepcopy(self.config)
# update the config to run the profiler workflow
config["source"]["sourceConfig"]["config"] = {
"type": "Profiler",
"generateSampleData": True,
"timeoutSeconds": 5400,
"tableFilterPattern": {
"includes": [self.table],
},
}
config["processor"] = {
"type": "orm-profiler",
"config": {},
}
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
profiler_workflow.print_status()
profiler_workflow.stop()
# get latest profile metrics
profile = self.metadata.get_profile_data(
self.table_fqn,
get_beginning_of_day_timestamp_mill(days=1),
get_end_of_day_timestamp_mill(),
profile_type=SystemProfile,
)
ddl_operations = [prl.operation.value for prl in profile.entities]
assert set(ddl_operations) == set(["INSERT", "UPDATE", "DELETE"])

View File

@ -0,0 +1,146 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate redshift system metrics (will be disabled by default). To be ran manually
How to use this test
--------------------
1. Comment the @pytest.mark.skip decorator on line 31
2. Make sure you have set up the right environment variables for the redshift database
check the config file at "cli_e2e/database/redshift/redshift.yaml". The metadata
ingestion will ingest data from the `dbt_jaffle` schema.
3. Prior to running this test you will need to execute DDLs in the `dbt_jaffle` schema.
We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema.
4. Once you have performed the above steps, run the test with the following command:
`python -m pytest tests/integration/orm_profiler/system/test_redshift_system_metrics.py` from the ingestion directory.
You can also perform the same action with your IDE.
:warning: the profiler workflow will be ran for the table set in `PROFILER_TABLE_FILTER_PATTERN`
"""
import os
import pathlib
from copy import deepcopy
from unittest import TestCase
import pytest
import yaml
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
REDSHIFT_CONFIG_FILE = "cli_e2e/database/redshift/redshift.yaml"
FULL_CONFIG_PATH = pathlib.Path(TESTS_ROOT_DIR, REDSHIFT_CONFIG_FILE)
DATABASE_FILTER = {
"includes": "dev",
"excludes": None,
}
SCHEMA_FILTER = {
"includes": "dbt_jaffle",
"excludes": None,
}
TABLE_FILTER = {
"includes": "boolean_test",
"excludes": None,
}
@pytest.mark.skip(
reason="Disabled by default. Should be ran manually on system metric updates"
)
class TestRedshiftSystem(TestCase):
"""Test class for redshift system metrics"""
hostPort = os.environ.get("E2E_REDSHIFT_HOST_PORT")
username = os.environ.get("E2E_REDSHIFT_USERNAME")
password = os.environ.get("E2E_REDSHIFT_PASSWORD")
database = DATABASE_FILTER["includes"]
full_config_path = FULL_CONFIG_PATH
schema = SCHEMA_FILTER["includes"]
table = TABLE_FILTER["includes"]
@classmethod
def setUpClass(cls) -> None:
"""set up class"""
with open(cls.full_config_path, "r", encoding="utf-8") as file:
cls.config = yaml.safe_load(file)
# set up the config to filter from the `dbt_jaffle` schema
cls.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"includes": [cls.schema],
}
cls.config["source"]["serviceConnection"]["config"]["hostPort"] = cls.hostPort
cls.config["source"]["serviceConnection"]["config"]["username"] = cls.username
cls.config["source"]["serviceConnection"]["config"]["password"] = cls.password
cls.config["source"]["serviceConnection"]["config"]["database"] = cls.database
# set metadata config
cls.metadata_config_dict = cls.config["workflowConfig"][
"openMetadataServerConfig"
]
cls.metadata_config = OpenMetadataConnection.parse_obj(cls.metadata_config_dict)
cls.metadata = OpenMetadata(cls.metadata_config)
# run the ingestion workflow
ingestion_workflow = Workflow.create(cls.config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.print_status()
ingestion_workflow.stop()
# get table fqn
cls.table_fqn = f"{cls.config['source']['serviceName']}.{cls.database}.{cls.schema}.{cls.table}"
def test_redshift_system_metrics(self):
"""run profiler workflow and check the system metrics"""
config = deepcopy(self.config)
# update the config to run the profiler workflow
config["source"]["sourceConfig"]["config"] = {
"type": "Profiler",
"generateSampleData": True,
"timeoutSeconds": 5400,
"tableFilterPattern": {
"includes": [self.table],
},
}
config["processor"] = {
"type": "orm-profiler",
"config": {},
}
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
profiler_workflow.print_status()
profiler_workflow.stop()
# get latest profile metrics
profile = self.metadata.get_profile_data(
self.table_fqn,
get_beginning_of_day_timestamp_mill(days=1),
get_end_of_day_timestamp_mill(),
profile_type=SystemProfile,
)
ddl_operations = [prl.operation.value for prl in profile.entities]
assert set(ddl_operations) == set(["INSERT", "UPDATE", "DELETE"])

View File

@ -0,0 +1,170 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate snowflake system metrics (will be disabled by default). To be ran manually
How to use this test
--------------------
1. Comment the @pytest.mark.skip decorator on line 31
2. Make sure you have set up the right environment variables for the snowflake database
check the config file at "cli_e2e/database/snowflake/snowflake.yaml". The metadata
ingestion will ingest data from the `TEST_DB` schema.
3. Prior to running this test you will need to execute DDLs in the `TEST_DB` schema.
We will need to perform at least one `DELETE`, `INSERT`, `UPDATE` on any table from the schema.
query example:
```
INSERT INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES
(1, 'FOO'),
(2, 'BAR'),
(3, 'BAZZ')
INSERT OVERWRITE INTO TEST_DB.TEST_SCHEMA.NEW_TAB VALUES
(4, 'FOOBAR'),
(5, 'FOOBAZZ'),
(6, 'BARBAZZ')
UPDATE TEST_DB.TEST_SCHEMA.NEW_TAB SET NAME='BABAR' WHERE id = 6;
MERGE INTO TEST_DB.TEST_SCHEMA.NEW_TAB NT USING (SELECT 5 AS id, 'BAR' AS NAME) N ON NT.id = N.id
WHEN MATCHED THEN UPDATE SET NT.NAME = N.NAME;
DELETE FROM TEST_DB.TEST_SCHEMA.NEW_TAB WHERE ID = 4;
```
4. Once you have performed the above steps, run the test with the following command:
`python -m pytest tests/integration/orm_profiler/system/test_snowflake_system_metrics.py` from the ingestion directory.
You can also perform the same action with your IDE.
:warning: the profiler workflow will be ran for the table set in `PROFILER_TABLE_FILTER_PATTERN`
"""
import os
import pathlib
from copy import deepcopy
from unittest import TestCase
import pytest
import yaml
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
SNOWFLAKE_CONFIG_FILE = "cli_e2e/database/snowflake/snowflake.yaml"
FULL_CONFIG_PATH = pathlib.Path(TESTS_ROOT_DIR, SNOWFLAKE_CONFIG_FILE)
DATABASE_FILTER = {
"includes": "TEST_DB",
"excludes": None,
}
SCHEMA_FILTER = {
"includes": "TEST_SCHEMA",
"excludes": None,
}
TABLE_FILTER = {
"includes": "NEW_TAB",
"excludes": None,
}
@pytest.mark.skip(
reason="Disabled by default. Should be ran manually on system metric updates"
)
class TestSnowflakeystem(TestCase):
"""Test class for snowflake system metrics"""
account = os.environ.get("E2E_SNOWFLAKE_ACCOUNT")
warehouse = os.environ.get("E2E_SNOWFLAKE_WAREHOUSE")
username = os.environ.get("E2E_SNOWFLAKE_USERNAME")
password = os.environ.get("E2E_SNOWFLAKE_PASSWORD")
database = DATABASE_FILTER["includes"]
full_config_path = FULL_CONFIG_PATH
schema = SCHEMA_FILTER["includes"]
table = TABLE_FILTER["includes"]
@classmethod
def setUpClass(cls) -> None:
"""set up class"""
with open(cls.full_config_path, "r", encoding="utf-8") as file:
cls.config = yaml.safe_load(file)
# set up the config to filter from the `dbt_jaffle` schema
cls.config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"includes": [cls.schema],
}
cls.config["source"]["sourceConfig"]["config"]["tableFilterPattern"] = {
"includes": [cls.table],
}
cls.config["source"]["serviceConnection"]["config"]["account"] = cls.account
cls.config["source"]["serviceConnection"]["config"]["warehouse"] = cls.warehouse
cls.config["source"]["serviceConnection"]["config"]["username"] = cls.username
cls.config["source"]["serviceConnection"]["config"]["password"] = cls.password
cls.config["source"]["serviceConnection"]["config"]["database"] = cls.database
# set metadata config
cls.metadata_config_dict = cls.config["workflowConfig"][
"openMetadataServerConfig"
]
cls.metadata_config = OpenMetadataConnection.parse_obj(cls.metadata_config_dict)
cls.metadata = OpenMetadata(cls.metadata_config)
# run the ingestion workflow
ingestion_workflow = Workflow.create(cls.config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.print_status()
ingestion_workflow.stop()
# get table fqn
cls.table_fqn = f"{cls.config['source']['serviceName']}.{cls.database}.{cls.schema}.{cls.table}"
def test_snowflake_system_metrics(self):
"""run profiler workflow and check the system metrics"""
config = deepcopy(self.config)
# update the config to run the profiler workflow
config["source"]["sourceConfig"]["config"] = {
"type": "Profiler",
"generateSampleData": True,
"timeoutSeconds": 5400,
"tableFilterPattern": {
"includes": [self.table],
},
}
config["processor"] = {
"type": "orm-profiler",
"config": {},
}
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
profiler_workflow.print_status()
profiler_workflow.stop()
# get latest profile metrics
profile = self.metadata.get_profile_data(
self.table_fqn,
get_beginning_of_day_timestamp_mill(days=1),
get_end_of_day_timestamp_mill(),
profile_type=SystemProfile,
)
ddl_operations = [prl.operation.value for prl in profile.entities]
assert set(ddl_operations) == set(["INSERT", "UPDATE", "DELETE"])

View File

@ -21,7 +21,6 @@ from copy import deepcopy
from datetime import datetime, timedelta
from unittest import TestCase
import pytest
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base

View File

@ -65,7 +65,7 @@ class Row:
start_time,
query_text,
):
self.query_id = query_id
self.query_type = query_type
self.start_time = start_time
self.query_text = query_text
self.QUERY_ID = query_id
self.QUERY_TYPE = query_type
self.START_TIME = start_time
self.QUERY_TEXT = query_text

View File

@ -17,12 +17,22 @@ import os
from datetime import datetime
from unittest import TestCase
import pytest
from sqlalchemy import Column, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.sql.sqltypes import Integer, String
from metadata.profiler.metrics.hybrid.histogram import Histogram
from metadata.utils.profiler_utils import ColumnLike, get_snowflake_system_queries
from metadata.profiler.metrics.system.queries.snowflake import (
get_snowflake_system_queries,
)
from metadata.profiler.metrics.system.system import recursive_dic
from metadata.utils.profiler_utils import (
ColumnLike,
get_identifiers_from_string,
get_value_from_cache,
set_cache,
)
from metadata.utils.sqa_utils import handle_array, is_array
from .conftest import Row
@ -145,7 +155,7 @@ def test_column_like_object():
def test_get_snowflake_system_queries():
"""Test get snowflake system queries"""
row = Row(
query_id=1,
query_id="1",
query_type="INSERT",
start_time=datetime.now(),
query_text="INSERT INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')",
@ -153,7 +163,7 @@ def test_get_snowflake_system_queries():
query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore
assert query_result
assert query_result.query_id == 1
assert query_result.query_id == "1"
assert query_result.query_type == "INSERT"
assert query_result.database_name == "database"
assert query_result.schema_name == "schema"
@ -169,3 +179,81 @@ def test_get_snowflake_system_queries():
query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore
assert not query_result
@pytest.mark.parametrize(
"query, expected",
[
(
"INSERT INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')",
"INSERT",
),
(
"INSERT OVERWRITE INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')",
"INSERT",
),
(
"MERGE INTO DATABASE.SCHEMA.TABLE1 (col1, col2) VALUES (1, 'a'), (2, 'b')",
"MERGE",
),
("DELETE FROM DATABASE.SCHEMA.TABLE1 WHERE val = 9999", "MERGE"),
("UPDATE DATABASE.SCHEMA.TABLE1 SET col1 = 1 WHERE val = 9999", "UPDATE"),
],
)
def test_get_snowflake_system_queries_all_dll(query, expected):
"""test we ca get all ddl queries
reference https://docs.snowflake.com/en/sql-reference/sql-dml
"""
row = Row(
query_id=1,
query_type=expected,
start_time=datetime.now(),
query_text=query,
)
query_result = get_snowflake_system_queries(row, "DATABASE", "SCHEMA") # type: ignore
assert query_result
assert query_result.query_type == expected
assert query_result.database_name == "database"
assert query_result.schema_name == "schema"
assert query_result.table_name == "table1"
@pytest.mark.parametrize(
"identifier, expected",
[
("DATABASE.SCHEMA.TABLE1", ("DATABASE", "SCHEMA", "TABLE1")),
('DATABASE.SCHEMA."TABLE.DOT"', ("DATABASE", "SCHEMA", "TABLE.DOT")),
('DATABASE."SCHEMA.DOT".TABLE', ("DATABASE", "SCHEMA.DOT", "TABLE")),
('"DATABASE.DOT".SCHEMA.TABLE', ("DATABASE.DOT", "SCHEMA", "TABLE")),
('DATABASE."SCHEMA.DOT"."TABLE.DOT"', ("DATABASE", "SCHEMA.DOT", "TABLE.DOT")),
('"DATABASE.DOT"."SCHEMA.DOT".TABLE', ("DATABASE.DOT", "SCHEMA.DOT", "TABLE")),
(
'"DATABASE.DOT"."SCHEMA.DOT"."TABLE.DOT"',
("DATABASE.DOT", "SCHEMA.DOT", "TABLE.DOT"),
),
],
)
def test_get_identifiers_from_string(identifier, expected):
"""test get identifiers from string"""
assert get_identifiers_from_string(identifier) == expected
def test_cache_func():
"""test get and set cache"""
cache_dict = recursive_dic()
cache_value = [1, 2, 3, 4, 5]
new_cache_value = [6, 7, 8, 9, 10]
cache = get_value_from_cache(cache_dict, "key1.key2.key3")
assert not cache
set_cache(cache_dict, "key1.key2.key3", cache_value)
cache = get_value_from_cache(cache_dict, "key1.key2.key3")
assert cache == cache_value
# calling set_cache on the same key will reset the cache
set_cache(cache_dict, "key1.key2.key3", new_cache_value)
cache = get_value_from_cache(cache_dict, "key1.key2.key3")
assert cache == new_cache_value