diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/__init__.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py deleted file mode 100644 index 750ae61fb20..00000000000 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py +++ /dev/null @@ -1,31 +0,0 @@ -"""BigqQuery Profiler""" - -from typing import List, Type - -from metadata.generated.schema.entity.data.table import SystemProfile -from metadata.ingestion.source.database.bigquery.profiler.system import ( - BigQuerySystemMetricsComputer, -) -from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( - BigQueryProfilerInterface, -) -from metadata.profiler.metrics.system.system import System -from metadata.profiler.processor.runner import QueryRunner - - -class BigQueryProfiler(BigQueryProfilerInterface): - def _compute_system_metrics( - self, - metrics: Type[System], - runner: QueryRunner, - *args, - **kwargs, - ) -> List[SystemProfile]: - return self.system_metrics_computer.get_system_metrics( - table=runner.dataset, - usage_location=self.service_connection_config.usageLocation, - runner=runner, - ) - - def initialize_system_metrics_computer(self) -> BigQuerySystemMetricsComputer: - return BigQuerySystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py b/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py index 8140e30143e..95365bef124 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/service_spec.py @@ -1,9 +1,9 @@ from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource from metadata.ingestion.source.database.bigquery.metadata import BigquerySource -from metadata.ingestion.source.database.bigquery.profiler.profiler import ( - BigQueryProfiler, -) from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource +from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( + BigQueryProfilerInterface, +) from metadata.sampler.sqlalchemy.bigquery.sampler import BigQuerySampler from metadata.utils.service_spec.default import DefaultDatabaseSpec @@ -11,6 +11,6 @@ ServiceSpec = DefaultDatabaseSpec( metadata_source_class=BigquerySource, lineage_source_class=BigqueryLineageSource, usage_source_class=BigqueryUsageSource, - profiler_class=BigQueryProfiler, + profiler_class=BigQueryProfilerInterface, sampler_class=BigQuerySampler, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/__init__.py b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py deleted file mode 100644 index 6eee6e184fd..00000000000 --- a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py +++ /dev/null @@ -1,14 +0,0 @@ -"""Redshift profiler""" - -from metadata.ingestion.source.database.redshift.profiler.system import ( - RedshiftSystemMetricsComputer, -) -from metadata.profiler.interface.sqlalchemy.profiler_interface import ( - SQAProfilerInterface, -) -from metadata.profiler.metrics.system.system import SystemMetricsComputer - - -class RedshiftProfiler(SQAProfilerInterface): - def initialize_system_metrics_computer(self) -> SystemMetricsComputer: - return RedshiftSystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py index 1fe71c42dbc..f6ae06730ef 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py @@ -15,9 +15,6 @@ SQL Queries used during ingestion import textwrap from typing import List -from sqlalchemy import text -from sqlalchemy.orm.session import Session - from metadata.utils.profiler_utils import QueryResult from metadata.utils.time_utils import datetime_to_timestamp @@ -418,38 +415,6 @@ STL_QUERY = """ """ -def get_query_results( - session: Session, - query, - operation, -) -> List[QueryResult]: - """get query results either from cache or from the database - - Args: - session (Session): session - query (_type_): query - operation (_type_): operation - - Returns: - List[QueryResult]: - """ - cursor = session.execute(text(query)) - results = [ - QueryResult( - database_name=row.database, - schema_name=row.schema, - table_name=row.table, - query_text=None, - query_type=operation, - start_time=row.starttime, - rows=row.rows, - ) - for row in cursor - ] - - return results - - def get_metric_result(ddls: List[QueryResult], table_name: str) -> List: """Given query results, retur the metric result diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py b/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py index 6f010e9287e..1c0152bb1fd 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/service_spec.py @@ -1,14 +1,14 @@ from metadata.ingestion.source.database.redshift.lineage import RedshiftLineageSource from metadata.ingestion.source.database.redshift.metadata import RedshiftSource -from metadata.ingestion.source.database.redshift.profiler.profiler import ( - RedshiftProfiler, -) from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource +from metadata.profiler.interface.sqlalchemy.redshift.profiler_interface import ( + RedshiftProfilerInterface, +) from metadata.utils.service_spec.default import DefaultDatabaseSpec ServiceSpec = DefaultDatabaseSpec( metadata_source_class=RedshiftSource, lineage_source_class=RedshiftLineageSource, usage_source_class=RedshiftUsageSource, - profiler_class=RedshiftProfiler, + profiler_class=RedshiftProfilerInterface, ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/__init__.py b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py deleted file mode 100644 index 0ca3eaf8791..00000000000 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2025 Collate -# Licensed under the Collate Community License, Version 1.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Profiler for Snowflake -""" -from metadata.ingestion.source.database.snowflake.profiler.system import ( - SnowflakeSystemMetricsComputer, -) -from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import ( - SnowflakeProfilerInterface, -) -from metadata.profiler.metrics.system.system import SystemMetricsComputer - - -class SnowflakeProfiler(SnowflakeProfilerInterface): - def initialize_system_metrics_computer(self) -> SystemMetricsComputer: - return SnowflakeSystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py b/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py index 207cc6b6104..c08ba11212a 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/service_spec.py @@ -6,10 +6,10 @@ from metadata.ingestion.source.database.snowflake.data_diff.data_diff import ( ) from metadata.ingestion.source.database.snowflake.lineage import SnowflakeLineageSource from metadata.ingestion.source.database.snowflake.metadata import SnowflakeSource -from metadata.ingestion.source.database.snowflake.profiler.profiler import ( - SnowflakeProfiler, -) from metadata.ingestion.source.database.snowflake.usage import SnowflakeUsageSource +from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import ( + SnowflakeProfilerInterface, +) from metadata.sampler.sqlalchemy.snowflake.sampler import SnowflakeSampler from metadata.utils.service_spec.default import DefaultDatabaseSpec @@ -17,7 +17,7 @@ ServiceSpec = DefaultDatabaseSpec( metadata_source_class=SnowflakeSource, lineage_source_class=SnowflakeLineageSource, usage_source_class=SnowflakeUsageSource, - profiler_class=SnowflakeProfiler, + profiler_class=SnowflakeProfilerInterface, test_suite_class=SnowflakeTestSuiteInterface, sampler_class=SnowflakeSampler, data_diff=SnowflakeTableParameter, diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py index 9bb3dcbffda..4a87c238a46 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py @@ -13,16 +13,45 @@ Interfaces with database for all database engine supporting sqlalchemy abstraction layer """ +from typing import List, Type, cast + from sqlalchemy import Column, inspect +from metadata.generated.schema.entity.data.table import SystemProfile from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) +from metadata.profiler.metrics.system.bigquery.system import ( + BigQuerySystemMetricsComputer, +) +from metadata.profiler.metrics.system.system import System +from metadata.profiler.processor.runner import QueryRunner +from metadata.utils.logger import profiler_interface_registry_logger + +logger = profiler_interface_registry_logger() class BigQueryProfilerInterface(SQAProfilerInterface): """BigQuery profiler interface""" + def _compute_system_metrics( + self, + metrics: Type[System], + runner: QueryRunner, + *args, + **kwargs, + ) -> List[SystemProfile]: + logger.debug(f"Computing {metrics.name()} metric for {runner.table_name}") + self.system_metrics_class = cast( + Type[BigQuerySystemMetricsComputer], self.system_metrics_class + ) + instance = self.system_metrics_class( + session=self.session, + runner=runner, + usage_location=self.service_connection_config.usageLocation, + ) + return instance.get_system_metrics() + def _get_struct_columns(self, columns: dict, parent: str): """""" # pylint: disable=import-outside-toplevel diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/databricks/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/databricks/profiler_interface.py index 5974a3d92b3..403a1e3265a 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/databricks/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/databricks/profiler_interface.py @@ -13,25 +13,55 @@ Interfaces with database for all database engine supporting sqlalchemy abstraction layer """ -from typing import List +from typing import List, Type, cast from pyhive.sqlalchemy_hive import HiveCompiler from sqlalchemy import Column from metadata.generated.schema.entity.data.table import Column as OMColumn -from metadata.generated.schema.entity.data.table import ColumnName, DataType +from metadata.generated.schema.entity.data.table import ( + ColumnName, + DataType, + SystemProfile, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) +from metadata.profiler.metrics.system.databricks.system import ( + DatabricksSystemMetricsComputer, +) +from metadata.profiler.metrics.system.system import System from metadata.profiler.orm.converter.base import build_orm_col +from metadata.profiler.processor.runner import QueryRunner +from metadata.utils.logger import profiler_interface_registry_logger + +logger = profiler_interface_registry_logger() class DatabricksProfilerInterface(SQAProfilerInterface): """Databricks profiler interface""" + def _compute_system_metrics( + self, + metrics: Type[System], + runner: QueryRunner, + *args, + **kwargs, + ) -> List[SystemProfile]: + logger.debug(f"Computing {metrics.name()} metric for {runner.table_name}") + self.system_metrics_class = cast( + Type[DatabricksSystemMetricsComputer], self.system_metrics_class + ) + instance = self.system_metrics_class( + session=self.session, + runner=runner, + catalog=self.service_connection_config.catalog, + ) + return instance.get_system_metrics() + def visit_column(self, *args, **kwargs): result = super( # pylint: disable=bad-super-call HiveCompiler, self diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 774a6eda3a2..fdbc4825738 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -26,7 +26,7 @@ from typing import Any, Dict, List, Optional, Type, Union from sqlalchemy import Column, inspect, text from sqlalchemy.exc import DBAPIError, ProgrammingError, ResourceClosedError -from sqlalchemy.orm import scoped_session +from sqlalchemy.orm import Session, scoped_session from metadata.generated.schema.entity.data.table import ( CustomMetricProfile, @@ -51,7 +51,7 @@ from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.static.mean import Mean from metadata.profiler.metrics.static.stddev import StdDev from metadata.profiler.metrics.static.sum import Sum -from metadata.profiler.metrics.system.system import System, SystemMetricsComputer +from metadata.profiler.metrics.system.system import System, SystemMetricsRegistry from metadata.profiler.orm.functions.table_metric_computer import TableMetricComputer from metadata.profiler.orm.registry import Dialects from metadata.profiler.processor.metric_filter import MetricFilter @@ -111,13 +111,9 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): self._table = self.sampler.raw_dataset self.create_session() - self.system_metrics_computer = self.initialize_system_metrics_computer() - - def initialize_system_metrics_computer(self) -> SystemMetricsComputer: - """Initialize system metrics computer. Override this if you want to use a metric source with - state or other dependencies. - """ - return SystemMetricsComputer() + self.system_metrics_class = SystemMetricsRegistry.get( + self.session.get_bind().dialect + ) def create_session(self): self.session_factory = self._session_factory() @@ -362,7 +358,8 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): *args, **kwargs, ) -> List[SystemProfile]: - """Get system metric for tables + """Get system metric for tables. Override this in the interface if you want to use a metric source with + for other sources. Args: metric_type: type of metric @@ -372,8 +369,10 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): Returns: dictionnary of results """ - logger.debug(f"Computing system metrics for {runner.table_name}") - return self.system_metrics_computer.get_system_metrics(runner=runner) + logger.debug( + f"No implementation found for {self.session.get_bind().dialect.name} for {metrics.name()} metric" + ) + return [] def _create_thread_safe_runner(self, session, column=None): """Create thread safe runner""" diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/redshift/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/redshift/profiler_interface.py new file mode 100644 index 00000000000..0d603e94b38 --- /dev/null +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/redshift/profiler_interface.py @@ -0,0 +1,50 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Interfaces with database for all database engine +supporting sqlalchemy abstraction layer +""" +from typing import List, Type, cast + +from metadata.generated.schema.entity.data.table import SystemProfile +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) +from metadata.profiler.metrics.system.redshift.system import ( + RedshiftSystemMetricsComputer, +) +from metadata.profiler.metrics.system.system import System +from metadata.profiler.processor.runner import QueryRunner +from metadata.utils.logger import profiler_interface_registry_logger + +logger = profiler_interface_registry_logger() + + +class RedshiftProfilerInterface(SQAProfilerInterface): + """Redshift profiler interface""" + + def _compute_system_metrics( + self, + metrics: Type[System], + runner: QueryRunner, + *args, + **kwargs, + ) -> List[SystemProfile]: + logger.debug(f"Computing {metrics.name()} metric for {runner.table_name}") + self.system_metrics_class = cast( + Type[RedshiftSystemMetricsComputer], self.system_metrics_class + ) + instance = self.system_metrics_class( + session=self.session, + runner=runner, + ) + return instance.get_system_metrics() diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py index 63a7e72ac74..8f7289f7e0e 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py @@ -14,10 +14,18 @@ Interfaces with database for all database engine supporting sqlalchemy abstraction layer """ +from typing import List, Type, cast + +from metadata.generated.schema.entity.data.table import SystemProfile from metadata.profiler.interface.sqlalchemy.profiler_interface import ( OVERFLOW_ERROR_CODES, SQAProfilerInterface, ) +from metadata.profiler.metrics.system.snowflake.system import ( + SnowflakeSystemMetricsComputer, +) +from metadata.profiler.metrics.system.system import System +from metadata.profiler.processor.runner import QueryRunner from metadata.utils.logger import profiler_interface_registry_logger logger = profiler_interface_registry_logger() @@ -36,6 +44,18 @@ class SnowflakeProfilerInterface(SQAProfilerInterface): super().create_session() self.set_session_tag(self.session) + def _compute_system_metrics( + self, metrics: Type[System], runner: QueryRunner, *args, **kwargs + ) -> List[SystemProfile]: + self.system_metrics_class = cast( + Type[SnowflakeSystemMetricsComputer], self.system_metrics_class + ) + instance = self.system_metrics_class( + session=self.session, + runner=runner, + ) + return instance.get_system_metrics() + def _programming_error_static_metric(self, runner, column, exc, session, metrics): if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get( session.bind.dialect.name diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py b/ingestion/src/metadata/profiler/metrics/system/bigquery/system.py similarity index 63% rename from ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py rename to ingestion/src/metadata/profiler/metrics/system/bigquery/system.py index 8cfa446c0f6..f6b5230058e 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/bigquery/system.py @@ -3,16 +3,17 @@ from typing import List from pydantic import TypeAdapter +from sqlalchemy.orm import Session from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations from metadata.profiler.metrics.system.system import ( CacheProvider, - EmptySystemMetricsSource, - SQASessionProvider, SystemMetricsComputer, + register_system_metrics, ) +from metadata.profiler.orm.registry import PythonDialects from metadata.profiler.processor.runner import QueryRunner from metadata.utils.logger import profiler_logger from metadata.utils.time_utils import datetime_to_timestamp @@ -20,36 +21,32 @@ from metadata.utils.time_utils import datetime_to_timestamp logger = profiler_logger() -class BigQuerySystemMetricsSource( - SQASessionProvider, EmptySystemMetricsSource, CacheProvider -): +@register_system_metrics(PythonDialects.BigQuery) +class BigQuerySystemMetricsComputer(SystemMetricsComputer, CacheProvider): """BigQuery system metrics source class""" - def get_kwargs(self, **kwargs): - runner: QueryRunner = kwargs.get("runner") - return { - "table": runner.table_name, - "project_id": runner.session.get_bind().url.host, - "dataset_id": runner.schema_name, - "usage_location": kwargs.get("usage_location"), - } + def __init__( + self, + session: Session, + runner: QueryRunner, + usage_location: str, + ): + self.session = session + self.table = runner.table_name + self.project_id = runner.session.get_bind().url.host + self.dataset_id = runner.schema_name + self.usage_location = usage_location - def get_deletes(self, **kwargs) -> List[SystemProfile]: - table, project_id, usage_location, dataset_id = ( - kwargs.get("table"), - kwargs.get("project_id"), - kwargs.get("usage_location"), - kwargs.get("dataset_id"), - ) + def get_deletes(self) -> List[SystemProfile]: return self.get_system_profile( - project_id, - dataset_id, - table, + self.project_id, + self.dataset_id, + self.table, list( self.get_queries_by_operation( - usage_location, - project_id, - dataset_id, + self.usage_location, + self.project_id, + self.dataset_id, [ DatabaseDMLOperations.DELETE, ], @@ -59,21 +56,15 @@ class BigQuerySystemMetricsSource( DmlOperationType.DELETE, ) - def get_updates(self, **kwargs) -> List[SystemProfile]: - table, project_id, usage_location, dataset_id = ( - kwargs.get("table"), - kwargs.get("project_id"), - kwargs.get("usage_location"), - kwargs.get("dataset_id"), - ) + def get_updates(self) -> List[SystemProfile]: return self.get_system_profile( - project_id, - dataset_id, - table, + self.project_id, + self.dataset_id, + self.table, self.get_queries_by_operation( - usage_location, - project_id, - dataset_id, + self.usage_location, + self.project_id, + self.dataset_id, [ DatabaseDMLOperations.UPDATE, DatabaseDMLOperations.MERGE, @@ -83,21 +74,15 @@ class BigQuerySystemMetricsSource( DmlOperationType.UPDATE, ) - def get_inserts(self, **kwargs) -> List[SystemProfile]: - table, project_id, usage_location, dataset_id = ( - kwargs.get("table"), - kwargs.get("project_id"), - kwargs.get("usage_location"), - kwargs.get("dataset_id"), - ) + def get_inserts(self) -> List[SystemProfile]: return self.get_system_profile( - project_id, - dataset_id, - table, + self.project_id, + self.dataset_id, + self.table, self.get_queries_by_operation( - usage_location, - project_id, - dataset_id, + self.usage_location, + self.project_id, + self.dataset_id, [ DatabaseDMLOperations.INSERT, DatabaseDMLOperations.MERGE, @@ -127,7 +112,7 @@ class BigQuerySystemMetricsSource( return self.get_or_update_cache( f"{project_id}.{dataset_id}", BigQueryQueryResult.get_for_table, - session=super().get_session(), + session=self.session, usage_location=usage_location, project_id=project_id, dataset_id=dataset_id, @@ -154,13 +139,10 @@ class BigQuerySystemMetricsSource( "rowsAffected": getattr(q, rows_affected_field), } for q in query_results - if getattr(q, rows_affected_field, -1) > 0 + if getattr(q, rows_affected_field) + or -1 > 0 and q.project_id == project_id and q.dataset_id == dataset_id and q.table_name == table ] ) - - -class BigQuerySystemMetricsComputer(SystemMetricsComputer, BigQuerySystemMetricsSource): - pass diff --git a/ingestion/src/metadata/profiler/metrics/system/databricks/system.py b/ingestion/src/metadata/profiler/metrics/system/databricks/system.py new file mode 100644 index 00000000000..50149ca0317 --- /dev/null +++ b/ingestion/src/metadata/profiler/metrics/system/databricks/system.py @@ -0,0 +1,132 @@ +import textwrap +from typing import List + +from pydantic import TypeAdapter +from sqlalchemy.orm import Session + +from metadata.generated.schema.entity.data.table import SystemProfile +from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations +from metadata.profiler.metrics.system.system import ( + CacheProvider, + SystemMetricsComputer, + register_system_metrics, +) +from metadata.profiler.orm.registry import PythonDialects +from metadata.profiler.processor.runner import QueryRunner +from metadata.utils.profiler_utils import QueryResult +from metadata.utils.time_utils import datetime_to_timestamp + +SYSTEM_QUERY = textwrap.dedent( + """ + SELECT + timestamp AS starttime, + COALESCE(CAST({column1} AS BIGINT), 0) + COALESCE(CAST({column2} AS BIGINT), 0) AS rows, + '{database}' AS database, + '{schema}' AS schema, + '{table}' AS table + FROM (DESCRIBE HISTORY {database}.{schema}.{table}) + WHERE operation IN ({operations}) AND timestamp > DATEADD(day, -1, CURRENT_TIMESTAMP()) + """ +) + + +@register_system_metrics(PythonDialects.Databricks) +class DatabricksSystemMetricsComputer(SystemMetricsComputer, CacheProvider): + """Databricks system metrics computer""" + + def __init__(self, session: Session, runner: QueryRunner, catalog: str): + self.session = session + self.table = runner.table_name + self.database = catalog + self.schema = runner.schema_name + + def _get_metrics_from_queries( + self, ddls: List[QueryResult], operation: str + ) -> List[SystemProfile]: + return TypeAdapter(List[SystemProfile]).validate_python( + [ + { + "timestamp": datetime_to_timestamp( + ddl.start_time, milliseconds=True + ), + "operation": operation, + "rowsAffected": ddl.rows, + } + for ddl in ddls + ] + ) + + def get_inserts(self) -> List[SystemProfile]: + operations = ", ".join( + [ + f"'{DatabaseDMLOperations.WRITE.value}'", + f"'{DatabaseDMLOperations.MERGE.value}'", + ] + ) + queries = self.get_or_update_cache( + f"{self.database}.{self.schema}.{self.table}.{DatabaseDMLOperations.INSERT.value}", + self._get_query_results, + self.session, + SYSTEM_QUERY.format( + column1="operationMetrics.numOutputRows", + column2="operationMetrics.numTargetRowsInserted", + database=self.database, + schema=self.schema, + table=self.table, + operations=operations, + ), + DatabaseDMLOperations.INSERT.value, + ) + return self._get_metrics_from_queries( + queries, DatabaseDMLOperations.INSERT.value + ) + + def get_deletes(self) -> List[SystemProfile]: + operations = ", ".join( + [ + f"'{DatabaseDMLOperations.DELETE.value}'", + f"'{DatabaseDMLOperations.MERGE.value}'", + ] + ) + queries = self.get_or_update_cache( + f"{self.database}.{self.schema}.{self.table}.{DatabaseDMLOperations.DELETE.value}", + self._get_query_results, + self.session, + SYSTEM_QUERY.format( + column1="operationMetrics.numDeletedRows", + column2="operationMetrics.numTargetRowsDeleted", + database=self.database, + schema=self.schema, + table=self.table, + operations=operations, + ), + DatabaseDMLOperations.DELETE.value, + ) + return self._get_metrics_from_queries( + queries, DatabaseDMLOperations.DELETE.value + ) + + def get_updates(self) -> List[SystemProfile]: + operations = ", ".join( + [ + f"'{DatabaseDMLOperations.UPDATE.value}'", + f"'{DatabaseDMLOperations.MERGE.value}'", + ] + ) + queries = self.get_or_update_cache( + f"{self.database}.{self.schema}.{self.table}.{DatabaseDMLOperations.UPDATE.value}", + self._get_query_results, + self.session, + SYSTEM_QUERY.format( + column1="operationMetrics.numUpdatedRows", + column2="operationMetrics.numTargetRowsUpdated", + database=self.database, + schema=self.schema, + table=self.table, + operations=operations, + ), + DatabaseDMLOperations.UPDATE.value, + ) + return self._get_metrics_from_queries( + queries, DatabaseDMLOperations.UPDATE.value + ) diff --git a/ingestion/src/metadata/profiler/metrics/system/dml_operation.py b/ingestion/src/metadata/profiler/metrics/system/dml_operation.py index 1d50dcfecda..7a2de78b43f 100644 --- a/ingestion/src/metadata/profiler/metrics/system/dml_operation.py +++ b/ingestion/src/metadata/profiler/metrics/system/dml_operation.py @@ -25,6 +25,7 @@ class DatabaseDMLOperations(Enum): UPDATE = "UPDATE" DELETE = "DELETE" MERGE = "MERGE" + WRITE = "WRITE" DML_OPERATION_MAP = { @@ -32,4 +33,5 @@ DML_OPERATION_MAP = { DatabaseDMLOperations.MERGE.value: DmlOperationType.UPDATE.value, DatabaseDMLOperations.UPDATE.value: DmlOperationType.UPDATE.value, DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value, + DatabaseDMLOperations.WRITE.value: DmlOperationType.WRITE.value, } diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py b/ingestion/src/metadata/profiler/metrics/system/redshift/system.py similarity index 59% rename from ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py rename to ingestion/src/metadata/profiler/metrics/system/redshift/system.py index 0afc804300c..376d28ae49a 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/redshift/system.py @@ -1,23 +1,21 @@ """ -Imeplemetation for the redshift system metrics source +Implemetation for the redshift system metrics source """ from typing import List from pydantic import TypeAdapter +from sqlalchemy.orm import Session from metadata.generated.schema.entity.data.table import SystemProfile -from metadata.ingestion.source.database.redshift.queries import ( - STL_QUERY, - get_query_results, -) +from metadata.ingestion.source.database.redshift.queries import STL_QUERY from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations from metadata.profiler.metrics.system.system import ( CacheProvider, - EmptySystemMetricsSource, - SQASessionProvider, SystemMetricsComputer, + register_system_metrics, ) +from metadata.profiler.orm.registry import PythonDialects from metadata.profiler.processor.runner import QueryRunner from metadata.utils.logger import profiler_logger from metadata.utils.profiler_utils import QueryResult @@ -26,58 +24,46 @@ from metadata.utils.time_utils import datetime_to_timestamp logger = profiler_logger() -class RedshiftSystemMetricsSource( - SQASessionProvider, EmptySystemMetricsSource, CacheProvider -): +@register_system_metrics(PythonDialects.Redshift) +class RedshiftSystemMetricsComputer(SystemMetricsComputer, CacheProvider): """Redshift system metrics source class""" - def get_inserts(self, **kwargs) -> List[SystemProfile]: - database, schema, table = ( - kwargs.get("database"), - kwargs.get("schema"), - kwargs.get("table"), - ) + def __init__( + self, + session: Session, + runner: QueryRunner, + ): + self.session = session + self.table = runner.table_name + self.database = runner.session.get_bind().url.database + self.schema = runner.schema_name + + def get_inserts(self) -> List[SystemProfile]: queries = self.get_or_update_cache( - f"{database}.{schema}", + f"{self.database}.{self.schema}.{DatabaseDMLOperations.INSERT.value}", self._get_insert_queries, - database=database, - schema=schema, + database=self.database, + schema=self.schema, ) - return get_metric_result(queries, table) + return get_metric_result(queries, self.table) - def get_kwargs(self, **kwargs): - runner: QueryRunner = kwargs.get("runner") - return { - "table": runner.table_name, - "database": runner.session.get_bind().url.database, - "schema": runner.schema_name, - } - - def get_deletes(self, **kwargs) -> List[SystemProfile]: - database, schema, table = ( - kwargs.get("database"), - kwargs.get("schema"), - kwargs.get("table"), - ) + def get_deletes(self) -> List[SystemProfile]: queries = self.get_or_update_cache( - f"{database}.{schema}", + f"{self.database}.{self.schema}.{DatabaseDMLOperations.DELETE.value}", self._get_delete_queries, - database=database, - schema=schema, + database=self.database, + schema=self.schema, ) - return get_metric_result(queries, table) + return get_metric_result(queries, self.table) - def get_updates(self, **kwargs) -> List[SystemProfile]: - database = kwargs.get("database") - schema = kwargs.get("schema") - table = kwargs.get("table") + def get_updates(self) -> List[SystemProfile]: queries = self.get_or_update_cache( - f"{database}.{schema}", + f"{self.database}.{self.schema}.{DatabaseDMLOperations.UPDATE.value}", self._get_update_queries, - database=database, - schema=schema, + database=self.database, + schema=self.schema, ) - return get_metric_result(queries, table) + return get_metric_result(queries, self.table) def _get_insert_queries(self, database: str, schema: str) -> List[QueryResult]: insert_query = STL_QUERY.format( @@ -87,8 +73,8 @@ class RedshiftSystemMetricsSource( database=database, schema=schema, ) - return get_query_results( - super().get_session(), + return self._get_query_results( + self.session, insert_query, DatabaseDMLOperations.INSERT.value, ) @@ -101,8 +87,8 @@ class RedshiftSystemMetricsSource( database=database, schema=schema, ) - return get_query_results( - super().get_session(), + return self._get_query_results( + self.session, delete_query, DatabaseDMLOperations.DELETE.value, ) @@ -115,8 +101,8 @@ class RedshiftSystemMetricsSource( database=database, schema=schema, ) - return get_query_results( - super().get_session(), + return self._get_query_results( + self.session, update_query, DatabaseDMLOperations.UPDATE.value, ) @@ -143,7 +129,3 @@ def get_metric_result(ddls: List[QueryResult], table_name: str) -> List[SystemPr if ddl.table_name == table_name ] ) - - -class RedshiftSystemMetricsComputer(SystemMetricsComputer, RedshiftSystemMetricsSource): - pass diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py b/ingestion/src/metadata/profiler/metrics/system/snowflake/system.py similarity index 90% rename from ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py rename to ingestion/src/metadata/profiler/metrics/system/snowflake/system.py index c4b4ab841fa..4abc4cf372b 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/snowflake/system.py @@ -7,6 +7,7 @@ from typing import List, Optional, Tuple import sqlalchemy.orm from pydantic import TypeAdapter +from sqlalchemy.orm import Session from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile from metadata.ingestion.source.database.snowflake.models import ( @@ -16,10 +17,10 @@ from metadata.ingestion.source.database.snowflake.models import ( from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations from metadata.profiler.metrics.system.system import ( CacheProvider, - EmptySystemMetricsSource, - SQASessionProvider, SystemMetricsComputer, + register_system_metrics, ) +from metadata.profiler.orm.registry import PythonDialects from metadata.profiler.processor.runner import QueryRunner from metadata.utils.collections import CaseInsensitiveString from metadata.utils.logger import profiler_logger @@ -283,38 +284,30 @@ def get_snowflake_system_queries( return None -class SnowflakeSystemMetricsSource( - SQASessionProvider, EmptySystemMetricsSource, CacheProvider[SnowflakeQueryLogEntry] +@register_system_metrics(PythonDialects.Snowflake) +class SnowflakeSystemMetricsComputer( + SystemMetricsComputer, CacheProvider[SnowflakeQueryLogEntry] ): """Snowflake system metrics source""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, session: Session, runner: QueryRunner): + self.session = session + self.runner = runner + self.table = runner.table_name + self.database = runner.session.get_bind().url.database + self.schema = runner.schema_name self.resolver = SnowflakeTableResovler( - session=super().get_session(), + session=session, ) - def get_kwargs(self, **kwargs): - runner: QueryRunner = kwargs.get("runner") - return { - "table": runner.table_name, - "database": runner.session.get_bind().url.database, - "schema": runner.schema_name, - } - - def get_inserts(self, **kwargs) -> List[SystemProfile]: - database, schema, table = ( - kwargs.get("database"), - kwargs.get("schema"), - kwargs.get("table"), - ) + def get_inserts(self) -> List[SystemProfile]: return self.get_system_profile( - database, - schema, - table, + self.database, + self.schema, + self.table, list( self.get_queries_by_operation( - table, + self.table, [ DatabaseDMLOperations.INSERT, DatabaseDMLOperations.MERGE, @@ -325,19 +318,14 @@ class SnowflakeSystemMetricsSource( DmlOperationType.INSERT, ) - def get_updates(self, **kwargs) -> List[SystemProfile]: - database, schema, table = ( - kwargs.get("database"), - kwargs.get("schema"), - kwargs.get("table"), - ) + def get_updates(self) -> List[SystemProfile]: return self.get_system_profile( - database, - schema, - table, + self.database, + self.schema, + self.table, list( self.get_queries_by_operation( - table, + self.table, [ DatabaseDMLOperations.UPDATE, DatabaseDMLOperations.MERGE, @@ -348,19 +336,14 @@ class SnowflakeSystemMetricsSource( DmlOperationType.UPDATE, ) - def get_deletes(self, **kwargs) -> List[SystemProfile]: - database, schema, table = ( - kwargs.get("database"), - kwargs.get("schema"), - kwargs.get("table"), - ) + def get_deletes(self) -> List[SystemProfile]: return self.get_system_profile( - database, - schema, - table, + self.database, + self.schema, + self.table, list( self.get_queries_by_operation( - table, + self.table, [ DatabaseDMLOperations.DELETE, ], @@ -418,7 +401,7 @@ class SnowflakeSystemMetricsSource( queries = self.get_or_update_cache( table, SnowflakeQueryLogEntry.get_for_table, - session=super().get_session(), + session=self.session, tablename=table, ) results = [ @@ -429,9 +412,3 @@ class SnowflakeSystemMetricsSource( for row in queries ] return [result for result in results if result is not None] - - -class SnowflakeSystemMetricsComputer( - SystemMetricsComputer, SnowflakeSystemMetricsSource -): - pass diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index 12af4ba5419..4db7ccff634 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -15,16 +15,20 @@ System Metric from abc import ABC from collections import defaultdict -from typing import Callable, Generic, List, TypeVar +from typing import Callable, Dict, Generic, List, Optional, Protocol, Type, TypeVar +from sqlalchemy import text from sqlalchemy.orm import Session from metadata.generated.schema.configuration.profilerConfiguration import MetricType from metadata.generated.schema.entity.data.table import SystemProfile from metadata.profiler.metrics.core import SystemMetric +from metadata.profiler.orm.registry import PythonDialects from metadata.utils.helpers import deep_size_of_dict +from metadata.utils.importer import DynamicImportException, import_from_module from metadata.utils.logger import profiler_logger from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache +from metadata.utils.profiler_utils import QueryResult logger = profiler_logger() @@ -45,7 +49,23 @@ class CacheProvider(ABC, Generic[T]): """Cache provider class to provide cache for system metrics""" def __init__(self): - self.cache = LRUCache[T](LRU_CACHE_SIZE) + self.cache = LRUCache[List[T]](LRU_CACHE_SIZE) + + def __init_subclass__(cls, **kwargs): + """Ensure that subclasses properly initialize the cache""" + super().__init_subclass__(**kwargs) + + # Store the original __init__ method + original_init = cls.__init__ + + def new_init(self, *args, **kwargs): + # Always call CacheProvider.__init__ first + CacheProvider.__init__(self) + # Then call the original __init__ if it exists and is not CacheProvider's + if original_init is not CacheProvider.__init__: + original_init(self, *args, **kwargs) + + cls.__init__ = new_init def get_or_update_cache( self, @@ -53,64 +73,113 @@ class CacheProvider(ABC, Generic[T]): get_queries_fn: Callable[..., List[T]], *args, **kwargs, - ): + ) -> List[T]: if cache_path in self.cache: - return self.cache.get(cache_path) + cached_result = self.cache.get(cache_path) + return cached_result if cached_result is not None else [] result = get_queries_fn(*args, **kwargs) self.cache.put(cache_path, result) return result -class EmptySystemMetricsSource: - """Empty system metrics source that can be used as a default. Just returns an empty list of system metrics - for any resource.""" +class SystemMetricsComputer(Protocol): + """System metrics computer class to fetch system metrics for a given table.""" - def get_inserts(self, **kwargs) -> List[SystemProfile]: - """Get insert queries""" - return [] + def _get_query_results( + self, + session: Session, + query, + operation, + ) -> List[QueryResult]: + """get query results either from cache or from the database - def get_deletes(self, **kwargs) -> List[SystemProfile]: - """Get delete queries""" - return [] + Args: + session (Session): session + query (_type_): query + operation (_type_): operation - def get_updates(self, **kwargs) -> List[SystemProfile]: - """Get update queries""" - return [] + Returns: + List[QueryResult]: + """ + cursor = session.execute(text(query)) + results = [ + QueryResult( + database_name=row.database, + schema_name=row.schema, + table_name=row.table, + query_text=None, + query_type=operation, + start_time=row.starttime, + rows=row.rows, + ) + for row in cursor + if (row.rows is not None) and (row.rows > 0) + ] - def get_kwargs(self, **kwargs): - """Get kwargs to be used in get_inserts, get_deletes, get_updates""" - return {} + return results - -class SystemMetricsComputer(EmptySystemMetricsSource): - def __init__(self, *args, **kwargs): - # collaborative constructor that initalizes upstream classes - super().__init__(*args, **kwargs) - - def get_system_metrics(self, **kwargs) -> List[SystemProfile]: + def get_system_metrics(self) -> List[SystemProfile]: """Return system metrics for a given table. Actual passed object can be a variety of types based on the underlying infrastructure. For example, in the case of SQLalchemy, it can be a Table object and in the case of Mongo, it can be a collection object.""" - kwargs = super().get_kwargs(**kwargs) - return ( - super().get_inserts(**kwargs) - + super().get_deletes(**kwargs) - + super().get_updates(**kwargs) - ) + return self.get_inserts() + self.get_deletes() + self.get_updates() + + def get_inserts(self) -> List[SystemProfile]: + """Get insert queries""" + return [] + + def get_deletes(self) -> List[SystemProfile]: + """Get delete queries""" + return [] + + def get_updates(self) -> List[SystemProfile]: + """Get update queries""" + return [] -class SQASessionProvider: - """SQASessionProvider class to provide session to the system metrics""" +class SystemMetricsRegistry: + _registry: Dict[str, Type["SystemMetricsComputer"]] = {} - def __init__(self, *args, **kwargs): - self.session = kwargs.pop("session") - super().__init__(*args, **kwargs) + @classmethod + def register(cls, dialect: PythonDialects, implementation: Type): + cls._registry[dialect.name.lower()] = implementation - def get_session(self): - return self.session + @classmethod + def get(cls, dialect: PythonDialects) -> Optional[Type["SystemMetricsComputer"]]: + if dialect.name.lower() not in cls._registry: + cls._discover_implementation(dialect) + return cls._registry.get(dialect.name.lower()) - def get_database(self) -> str: - return self.session.get_bind().url.database + @classmethod + def _discover_implementation(cls, dialect: PythonDialects): + """Auto-discover the implementation in the profiler metrics""" + try: + implementation = import_from_module( + f"metadata.profiler.metrics.system.{dialect.name.lower()}.system" + ) + except DynamicImportException: + logger.warning(f"No implementation found for {dialect.name.lower()}") + return + cls._registry[dialect.name.lower()] = implementation + + +def register_system_metrics( + dialect: PythonDialects, +) -> Callable[[Type["SystemMetricsComputer"]], Type["SystemMetricsComputer"]]: + """Decorator to register a system metric implementation + + Args: + dialect (PythonDialects): database type + + Returns: + Callable: decorator function + """ + + def decorator(cls: Type["SystemMetricsComputer"]): + SystemMetricsRegistry.register(dialect, cls) + return cls + + return decorator class System(SystemMetric): diff --git a/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py index 16453da1b1d..b74c375169f 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/bigquery/sampler.py @@ -73,13 +73,6 @@ class BigQuerySampler(SQASampler): **kwargs, ) self.raw_dataset_type: Optional[TableType] = entity.tableType - if self._table.__table__.schema and self.entity.database.name: - self._table.__table__.schema = ( - f"{self.entity.database.name}.{self._table.__table__.schema}" - ) - self._table.__table_args__[ - "schema" - ] = f"{self.entity.database.name}.{self._table.__table_args__['schema']}" def set_tablesample(self, selectable: SqaTable): """Set the TABLESAMPLE clause for BigQuery diff --git a/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py b/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py index 48809d434cf..6e57bda1a64 100644 --- a/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py +++ b/ingestion/tests/unit/metadata/ingestion/source/database/snowflake/profiler/test_system_metrics.py @@ -2,7 +2,7 @@ from unittest.mock import MagicMock, Mock import pytest -from metadata.ingestion.source.database.snowflake.profiler.system import ( +from metadata.profiler.metrics.system.snowflake.system import ( PUBLIC_SCHEMA, SnowflakeTableResovler, ) diff --git a/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py b/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py index cd46cee6b1d..2f504891076 100644 --- a/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py +++ b/ingestion/tests/unit/profiler/sqlalchemy/test_metrics.py @@ -34,7 +34,6 @@ from metadata.profiler.interface.sqlalchemy.profiler_interface import ( ) from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.registry import Metrics -from metadata.profiler.metrics.system.system import SystemMetricsComputer from metadata.profiler.orm.functions.sum import SumFn from metadata.profiler.processor.core import Profiler from metadata.sampler.sqlalchemy.sampler import SQASampler @@ -939,9 +938,6 @@ class MetricsTest(TestCase): assert res == 61 - def test_system_metric(self): - assert SystemMetricsComputer().get_system_metrics() == [] - def test_table_custom_metric(self): table_entity = Table( id=uuid4(), diff --git a/ingestion/tests/unit/profiler/test_utils.py b/ingestion/tests/unit/profiler/test_utils.py index de8ea50b003..c48af353f2f 100644 --- a/ingestion/tests/unit/profiler/test_utils.py +++ b/ingestion/tests/unit/profiler/test_utils.py @@ -22,11 +22,11 @@ from sqlalchemy.orm import declarative_base from sqlalchemy.sql.sqltypes import Integer, String from metadata.ingestion.source.database.snowflake.models import SnowflakeQueryLogEntry -from metadata.ingestion.source.database.snowflake.profiler.system import ( +from metadata.profiler.metrics.hybrid.histogram import Histogram +from metadata.profiler.metrics.system.snowflake.system import ( SnowflakeTableResovler, get_snowflake_system_queries, ) -from metadata.profiler.metrics.hybrid.histogram import Histogram from metadata.profiler.metrics.system.system import recursive_dic from metadata.utils.profiler_utils import ( get_identifiers_from_string,