feat: add support for DBX system metrics (#22044)

* feat: add support for DBX system metrics

* feat: add support for DBX system metrics

* fix: added WRITE back

* fix: failing test cases

* fix: failing test
This commit is contained in:
Teddy 2025-07-02 08:54:16 +02:00 committed by GitHub
parent cd4ae489e3
commit 29450d1104
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 508 additions and 353 deletions

View File

@ -1,31 +0,0 @@
"""BigqQuery Profiler"""
from typing import List, Type
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.source.database.bigquery.profiler.system import (
BigQuerySystemMetricsComputer,
)
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.processor.runner import QueryRunner
class BigQueryProfiler(BigQueryProfilerInterface):
def _compute_system_metrics(
self,
metrics: Type[System],
runner: QueryRunner,
*args,
**kwargs,
) -> List[SystemProfile]:
return self.system_metrics_computer.get_system_metrics(
table=runner.dataset,
usage_location=self.service_connection_config.usageLocation,
runner=runner,
)
def initialize_system_metrics_computer(self) -> BigQuerySystemMetricsComputer:
return BigQuerySystemMetricsComputer(session=self.session)

View File

@ -1,9 +1,9 @@
from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource
from metadata.ingestion.source.database.bigquery.profiler.profiler import (
BigQueryProfiler,
)
from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.sampler.sqlalchemy.bigquery.sampler import BigQuerySampler
from metadata.utils.service_spec.default import DefaultDatabaseSpec
@ -11,6 +11,6 @@ ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=BigquerySource,
lineage_source_class=BigqueryLineageSource,
usage_source_class=BigqueryUsageSource,
profiler_class=BigQueryProfiler,
profiler_class=BigQueryProfilerInterface,
sampler_class=BigQuerySampler,
)

View File

@ -1,14 +0,0 @@
"""Redshift profiler"""
from metadata.ingestion.source.database.redshift.profiler.system import (
RedshiftSystemMetricsComputer,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.system.system import SystemMetricsComputer
class RedshiftProfiler(SQAProfilerInterface):
def initialize_system_metrics_computer(self) -> SystemMetricsComputer:
return RedshiftSystemMetricsComputer(session=self.session)

View File

@ -15,9 +15,6 @@ SQL Queries used during ingestion
import textwrap
from typing import List
from sqlalchemy import text
from sqlalchemy.orm.session import Session
from metadata.utils.profiler_utils import QueryResult
from metadata.utils.time_utils import datetime_to_timestamp
@ -418,38 +415,6 @@ STL_QUERY = """
"""
def get_query_results(
session: Session,
query,
operation,
) -> List[QueryResult]:
"""get query results either from cache or from the database
Args:
session (Session): session
query (_type_): query
operation (_type_): operation
Returns:
List[QueryResult]:
"""
cursor = session.execute(text(query))
results = [
QueryResult(
database_name=row.database,
schema_name=row.schema,
table_name=row.table,
query_text=None,
query_type=operation,
start_time=row.starttime,
rows=row.rows,
)
for row in cursor
]
return results
def get_metric_result(ddls: List[QueryResult], table_name: str) -> List:
"""Given query results, retur the metric result

View File

@ -1,14 +1,14 @@
from metadata.ingestion.source.database.redshift.lineage import RedshiftLineageSource
from metadata.ingestion.source.database.redshift.metadata import RedshiftSource
from metadata.ingestion.source.database.redshift.profiler.profiler import (
RedshiftProfiler,
)
from metadata.ingestion.source.database.redshift.usage import RedshiftUsageSource
from metadata.profiler.interface.sqlalchemy.redshift.profiler_interface import (
RedshiftProfilerInterface,
)
from metadata.utils.service_spec.default import DefaultDatabaseSpec
ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=RedshiftSource,
lineage_source_class=RedshiftLineageSource,
usage_source_class=RedshiftUsageSource,
profiler_class=RedshiftProfiler,
profiler_class=RedshiftProfilerInterface,
)

View File

@ -1,26 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Profiler for Snowflake
"""
from metadata.ingestion.source.database.snowflake.profiler.system import (
SnowflakeSystemMetricsComputer,
)
from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import (
SnowflakeProfilerInterface,
)
from metadata.profiler.metrics.system.system import SystemMetricsComputer
class SnowflakeProfiler(SnowflakeProfilerInterface):
def initialize_system_metrics_computer(self) -> SystemMetricsComputer:
return SnowflakeSystemMetricsComputer(session=self.session)

View File

@ -6,10 +6,10 @@ from metadata.ingestion.source.database.snowflake.data_diff.data_diff import (
)
from metadata.ingestion.source.database.snowflake.lineage import SnowflakeLineageSource
from metadata.ingestion.source.database.snowflake.metadata import SnowflakeSource
from metadata.ingestion.source.database.snowflake.profiler.profiler import (
SnowflakeProfiler,
)
from metadata.ingestion.source.database.snowflake.usage import SnowflakeUsageSource
from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import (
SnowflakeProfilerInterface,
)
from metadata.sampler.sqlalchemy.snowflake.sampler import SnowflakeSampler
from metadata.utils.service_spec.default import DefaultDatabaseSpec
@ -17,7 +17,7 @@ ServiceSpec = DefaultDatabaseSpec(
metadata_source_class=SnowflakeSource,
lineage_source_class=SnowflakeLineageSource,
usage_source_class=SnowflakeUsageSource,
profiler_class=SnowflakeProfiler,
profiler_class=SnowflakeProfilerInterface,
test_suite_class=SnowflakeTestSuiteInterface,
sampler_class=SnowflakeSampler,
data_diff=SnowflakeTableParameter,

View File

@ -13,16 +13,45 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from typing import List, Type, cast
from sqlalchemy import Column, inspect
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.system.bigquery.system import (
BigQuerySystemMetricsComputer,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
class BigQueryProfilerInterface(SQAProfilerInterface):
"""BigQuery profiler interface"""
def _compute_system_metrics(
self,
metrics: Type[System],
runner: QueryRunner,
*args,
**kwargs,
) -> List[SystemProfile]:
logger.debug(f"Computing {metrics.name()} metric for {runner.table_name}")
self.system_metrics_class = cast(
Type[BigQuerySystemMetricsComputer], self.system_metrics_class
)
instance = self.system_metrics_class(
session=self.session,
runner=runner,
usage_location=self.service_connection_config.usageLocation,
)
return instance.get_system_metrics()
def _get_struct_columns(self, columns: dict, parent: str):
""""""
# pylint: disable=import-outside-toplevel

View File

@ -13,25 +13,55 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from typing import List
from typing import List, Type, cast
from pyhive.sqlalchemy_hive import HiveCompiler
from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import Column as OMColumn
from metadata.generated.schema.entity.data.table import ColumnName, DataType
from metadata.generated.schema.entity.data.table import (
ColumnName,
DataType,
SystemProfile,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.system.databricks.system import (
DatabricksSystemMetricsComputer,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.orm.converter.base import build_orm_col
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
class DatabricksProfilerInterface(SQAProfilerInterface):
"""Databricks profiler interface"""
def _compute_system_metrics(
self,
metrics: Type[System],
runner: QueryRunner,
*args,
**kwargs,
) -> List[SystemProfile]:
logger.debug(f"Computing {metrics.name()} metric for {runner.table_name}")
self.system_metrics_class = cast(
Type[DatabricksSystemMetricsComputer], self.system_metrics_class
)
instance = self.system_metrics_class(
session=self.session,
runner=runner,
catalog=self.service_connection_config.catalog,
)
return instance.get_system_metrics()
def visit_column(self, *args, **kwargs):
result = super( # pylint: disable=bad-super-call
HiveCompiler, self

View File

@ -26,7 +26,7 @@ from typing import Any, Dict, List, Optional, Type, Union
from sqlalchemy import Column, inspect, text
from sqlalchemy.exc import DBAPIError, ProgrammingError, ResourceClosedError
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import Session, scoped_session
from metadata.generated.schema.entity.data.table import (
CustomMetricProfile,
@ -51,7 +51,7 @@ from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.metrics.static.mean import Mean
from metadata.profiler.metrics.static.stddev import StdDev
from metadata.profiler.metrics.static.sum import Sum
from metadata.profiler.metrics.system.system import System, SystemMetricsComputer
from metadata.profiler.metrics.system.system import System, SystemMetricsRegistry
from metadata.profiler.orm.functions.table_metric_computer import TableMetricComputer
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.processor.metric_filter import MetricFilter
@ -111,13 +111,9 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
self._table = self.sampler.raw_dataset
self.create_session()
self.system_metrics_computer = self.initialize_system_metrics_computer()
def initialize_system_metrics_computer(self) -> SystemMetricsComputer:
"""Initialize system metrics computer. Override this if you want to use a metric source with
state or other dependencies.
"""
return SystemMetricsComputer()
self.system_metrics_class = SystemMetricsRegistry.get(
self.session.get_bind().dialect
)
def create_session(self):
self.session_factory = self._session_factory()
@ -362,7 +358,8 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
*args,
**kwargs,
) -> List[SystemProfile]:
"""Get system metric for tables
"""Get system metric for tables. Override this in the interface if you want to use a metric source with
for other sources.
Args:
metric_type: type of metric
@ -372,8 +369,10 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
Returns:
dictionnary of results
"""
logger.debug(f"Computing system metrics for {runner.table_name}")
return self.system_metrics_computer.get_system_metrics(runner=runner)
logger.debug(
f"No implementation found for {self.session.get_bind().dialect.name} for {metrics.name()} metric"
)
return []
def _create_thread_safe_runner(self, session, column=None):
"""Create thread safe runner"""

View File

@ -0,0 +1,50 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from typing import List, Type, cast
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.metrics.system.redshift.system import (
RedshiftSystemMetricsComputer,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
class RedshiftProfilerInterface(SQAProfilerInterface):
"""Redshift profiler interface"""
def _compute_system_metrics(
self,
metrics: Type[System],
runner: QueryRunner,
*args,
**kwargs,
) -> List[SystemProfile]:
logger.debug(f"Computing {metrics.name()} metric for {runner.table_name}")
self.system_metrics_class = cast(
Type[RedshiftSystemMetricsComputer], self.system_metrics_class
)
instance = self.system_metrics_class(
session=self.session,
runner=runner,
)
return instance.get_system_metrics()

View File

@ -14,10 +14,18 @@ Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from typing import List, Type, cast
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
OVERFLOW_ERROR_CODES,
SQAProfilerInterface,
)
from metadata.profiler.metrics.system.snowflake.system import (
SnowflakeSystemMetricsComputer,
)
from metadata.profiler.metrics.system.system import System
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
@ -36,6 +44,18 @@ class SnowflakeProfilerInterface(SQAProfilerInterface):
super().create_session()
self.set_session_tag(self.session)
def _compute_system_metrics(
self, metrics: Type[System], runner: QueryRunner, *args, **kwargs
) -> List[SystemProfile]:
self.system_metrics_class = cast(
Type[SnowflakeSystemMetricsComputer], self.system_metrics_class
)
instance = self.system_metrics_class(
session=self.session,
runner=runner,
)
return instance.get_system_metrics()
def _programming_error_static_metric(self, runner, column, exc, session, metrics):
if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get(
session.bind.dialect.name

View File

@ -3,16 +3,17 @@
from typing import List
from pydantic import TypeAdapter
from sqlalchemy.orm import Session
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
CacheProvider,
EmptySystemMetricsSource,
SQASessionProvider,
SystemMetricsComputer,
register_system_metrics,
)
from metadata.profiler.orm.registry import PythonDialects
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_logger
from metadata.utils.time_utils import datetime_to_timestamp
@ -20,36 +21,32 @@ from metadata.utils.time_utils import datetime_to_timestamp
logger = profiler_logger()
class BigQuerySystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider
):
@register_system_metrics(PythonDialects.BigQuery)
class BigQuerySystemMetricsComputer(SystemMetricsComputer, CacheProvider):
"""BigQuery system metrics source class"""
def get_kwargs(self, **kwargs):
runner: QueryRunner = kwargs.get("runner")
return {
"table": runner.table_name,
"project_id": runner.session.get_bind().url.host,
"dataset_id": runner.schema_name,
"usage_location": kwargs.get("usage_location"),
}
def __init__(
self,
session: Session,
runner: QueryRunner,
usage_location: str,
):
self.session = session
self.table = runner.table_name
self.project_id = runner.session.get_bind().url.host
self.dataset_id = runner.schema_name
self.usage_location = usage_location
def get_deletes(self, **kwargs) -> List[SystemProfile]:
table, project_id, usage_location, dataset_id = (
kwargs.get("table"),
kwargs.get("project_id"),
kwargs.get("usage_location"),
kwargs.get("dataset_id"),
)
def get_deletes(self) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.project_id,
self.dataset_id,
self.table,
list(
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
self.usage_location,
self.project_id,
self.dataset_id,
[
DatabaseDMLOperations.DELETE,
],
@ -59,21 +56,15 @@ class BigQuerySystemMetricsSource(
DmlOperationType.DELETE,
)
def get_updates(self, **kwargs) -> List[SystemProfile]:
table, project_id, usage_location, dataset_id = (
kwargs.get("table"),
kwargs.get("project_id"),
kwargs.get("usage_location"),
kwargs.get("dataset_id"),
)
def get_updates(self) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.project_id,
self.dataset_id,
self.table,
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
self.usage_location,
self.project_id,
self.dataset_id,
[
DatabaseDMLOperations.UPDATE,
DatabaseDMLOperations.MERGE,
@ -83,21 +74,15 @@ class BigQuerySystemMetricsSource(
DmlOperationType.UPDATE,
)
def get_inserts(self, **kwargs) -> List[SystemProfile]:
table, project_id, usage_location, dataset_id = (
kwargs.get("table"),
kwargs.get("project_id"),
kwargs.get("usage_location"),
kwargs.get("dataset_id"),
)
def get_inserts(self) -> List[SystemProfile]:
return self.get_system_profile(
project_id,
dataset_id,
table,
self.project_id,
self.dataset_id,
self.table,
self.get_queries_by_operation(
usage_location,
project_id,
dataset_id,
self.usage_location,
self.project_id,
self.dataset_id,
[
DatabaseDMLOperations.INSERT,
DatabaseDMLOperations.MERGE,
@ -127,7 +112,7 @@ class BigQuerySystemMetricsSource(
return self.get_or_update_cache(
f"{project_id}.{dataset_id}",
BigQueryQueryResult.get_for_table,
session=super().get_session(),
session=self.session,
usage_location=usage_location,
project_id=project_id,
dataset_id=dataset_id,
@ -154,13 +139,10 @@ class BigQuerySystemMetricsSource(
"rowsAffected": getattr(q, rows_affected_field),
}
for q in query_results
if getattr(q, rows_affected_field, -1) > 0
if getattr(q, rows_affected_field)
or -1 > 0
and q.project_id == project_id
and q.dataset_id == dataset_id
and q.table_name == table
]
)
class BigQuerySystemMetricsComputer(SystemMetricsComputer, BigQuerySystemMetricsSource):
pass

View File

@ -0,0 +1,132 @@
import textwrap
from typing import List
from pydantic import TypeAdapter
from sqlalchemy.orm import Session
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
CacheProvider,
SystemMetricsComputer,
register_system_metrics,
)
from metadata.profiler.orm.registry import PythonDialects
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.profiler_utils import QueryResult
from metadata.utils.time_utils import datetime_to_timestamp
SYSTEM_QUERY = textwrap.dedent(
"""
SELECT
timestamp AS starttime,
COALESCE(CAST({column1} AS BIGINT), 0) + COALESCE(CAST({column2} AS BIGINT), 0) AS rows,
'{database}' AS database,
'{schema}' AS schema,
'{table}' AS table
FROM (DESCRIBE HISTORY {database}.{schema}.{table})
WHERE operation IN ({operations}) AND timestamp > DATEADD(day, -1, CURRENT_TIMESTAMP())
"""
)
@register_system_metrics(PythonDialects.Databricks)
class DatabricksSystemMetricsComputer(SystemMetricsComputer, CacheProvider):
"""Databricks system metrics computer"""
def __init__(self, session: Session, runner: QueryRunner, catalog: str):
self.session = session
self.table = runner.table_name
self.database = catalog
self.schema = runner.schema_name
def _get_metrics_from_queries(
self, ddls: List[QueryResult], operation: str
) -> List[SystemProfile]:
return TypeAdapter(List[SystemProfile]).validate_python(
[
{
"timestamp": datetime_to_timestamp(
ddl.start_time, milliseconds=True
),
"operation": operation,
"rowsAffected": ddl.rows,
}
for ddl in ddls
]
)
def get_inserts(self) -> List[SystemProfile]:
operations = ", ".join(
[
f"'{DatabaseDMLOperations.WRITE.value}'",
f"'{DatabaseDMLOperations.MERGE.value}'",
]
)
queries = self.get_or_update_cache(
f"{self.database}.{self.schema}.{self.table}.{DatabaseDMLOperations.INSERT.value}",
self._get_query_results,
self.session,
SYSTEM_QUERY.format(
column1="operationMetrics.numOutputRows",
column2="operationMetrics.numTargetRowsInserted",
database=self.database,
schema=self.schema,
table=self.table,
operations=operations,
),
DatabaseDMLOperations.INSERT.value,
)
return self._get_metrics_from_queries(
queries, DatabaseDMLOperations.INSERT.value
)
def get_deletes(self) -> List[SystemProfile]:
operations = ", ".join(
[
f"'{DatabaseDMLOperations.DELETE.value}'",
f"'{DatabaseDMLOperations.MERGE.value}'",
]
)
queries = self.get_or_update_cache(
f"{self.database}.{self.schema}.{self.table}.{DatabaseDMLOperations.DELETE.value}",
self._get_query_results,
self.session,
SYSTEM_QUERY.format(
column1="operationMetrics.numDeletedRows",
column2="operationMetrics.numTargetRowsDeleted",
database=self.database,
schema=self.schema,
table=self.table,
operations=operations,
),
DatabaseDMLOperations.DELETE.value,
)
return self._get_metrics_from_queries(
queries, DatabaseDMLOperations.DELETE.value
)
def get_updates(self) -> List[SystemProfile]:
operations = ", ".join(
[
f"'{DatabaseDMLOperations.UPDATE.value}'",
f"'{DatabaseDMLOperations.MERGE.value}'",
]
)
queries = self.get_or_update_cache(
f"{self.database}.{self.schema}.{self.table}.{DatabaseDMLOperations.UPDATE.value}",
self._get_query_results,
self.session,
SYSTEM_QUERY.format(
column1="operationMetrics.numUpdatedRows",
column2="operationMetrics.numTargetRowsUpdated",
database=self.database,
schema=self.schema,
table=self.table,
operations=operations,
),
DatabaseDMLOperations.UPDATE.value,
)
return self._get_metrics_from_queries(
queries, DatabaseDMLOperations.UPDATE.value
)

View File

@ -25,6 +25,7 @@ class DatabaseDMLOperations(Enum):
UPDATE = "UPDATE"
DELETE = "DELETE"
MERGE = "MERGE"
WRITE = "WRITE"
DML_OPERATION_MAP = {
@ -32,4 +33,5 @@ DML_OPERATION_MAP = {
DatabaseDMLOperations.MERGE.value: DmlOperationType.UPDATE.value,
DatabaseDMLOperations.UPDATE.value: DmlOperationType.UPDATE.value,
DatabaseDMLOperations.DELETE.value: DmlOperationType.DELETE.value,
DatabaseDMLOperations.WRITE.value: DmlOperationType.WRITE.value,
}

View File

@ -1,23 +1,21 @@
"""
Imeplemetation for the redshift system metrics source
Implemetation for the redshift system metrics source
"""
from typing import List
from pydantic import TypeAdapter
from sqlalchemy.orm import Session
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.source.database.redshift.queries import (
STL_QUERY,
get_query_results,
)
from metadata.ingestion.source.database.redshift.queries import STL_QUERY
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
CacheProvider,
EmptySystemMetricsSource,
SQASessionProvider,
SystemMetricsComputer,
register_system_metrics,
)
from metadata.profiler.orm.registry import PythonDialects
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import QueryResult
@ -26,58 +24,46 @@ from metadata.utils.time_utils import datetime_to_timestamp
logger = profiler_logger()
class RedshiftSystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider
):
@register_system_metrics(PythonDialects.Redshift)
class RedshiftSystemMetricsComputer(SystemMetricsComputer, CacheProvider):
"""Redshift system metrics source class"""
def get_inserts(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
def __init__(
self,
session: Session,
runner: QueryRunner,
):
self.session = session
self.table = runner.table_name
self.database = runner.session.get_bind().url.database
self.schema = runner.schema_name
def get_inserts(self) -> List[SystemProfile]:
queries = self.get_or_update_cache(
f"{database}.{schema}",
f"{self.database}.{self.schema}.{DatabaseDMLOperations.INSERT.value}",
self._get_insert_queries,
database=database,
schema=schema,
database=self.database,
schema=self.schema,
)
return get_metric_result(queries, table)
return get_metric_result(queries, self.table)
def get_kwargs(self, **kwargs):
runner: QueryRunner = kwargs.get("runner")
return {
"table": runner.table_name,
"database": runner.session.get_bind().url.database,
"schema": runner.schema_name,
}
def get_deletes(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
def get_deletes(self) -> List[SystemProfile]:
queries = self.get_or_update_cache(
f"{database}.{schema}",
f"{self.database}.{self.schema}.{DatabaseDMLOperations.DELETE.value}",
self._get_delete_queries,
database=database,
schema=schema,
database=self.database,
schema=self.schema,
)
return get_metric_result(queries, table)
return get_metric_result(queries, self.table)
def get_updates(self, **kwargs) -> List[SystemProfile]:
database = kwargs.get("database")
schema = kwargs.get("schema")
table = kwargs.get("table")
def get_updates(self) -> List[SystemProfile]:
queries = self.get_or_update_cache(
f"{database}.{schema}",
f"{self.database}.{self.schema}.{DatabaseDMLOperations.UPDATE.value}",
self._get_update_queries,
database=database,
schema=schema,
database=self.database,
schema=self.schema,
)
return get_metric_result(queries, table)
return get_metric_result(queries, self.table)
def _get_insert_queries(self, database: str, schema: str) -> List[QueryResult]:
insert_query = STL_QUERY.format(
@ -87,8 +73,8 @@ class RedshiftSystemMetricsSource(
database=database,
schema=schema,
)
return get_query_results(
super().get_session(),
return self._get_query_results(
self.session,
insert_query,
DatabaseDMLOperations.INSERT.value,
)
@ -101,8 +87,8 @@ class RedshiftSystemMetricsSource(
database=database,
schema=schema,
)
return get_query_results(
super().get_session(),
return self._get_query_results(
self.session,
delete_query,
DatabaseDMLOperations.DELETE.value,
)
@ -115,8 +101,8 @@ class RedshiftSystemMetricsSource(
database=database,
schema=schema,
)
return get_query_results(
super().get_session(),
return self._get_query_results(
self.session,
update_query,
DatabaseDMLOperations.UPDATE.value,
)
@ -143,7 +129,3 @@ def get_metric_result(ddls: List[QueryResult], table_name: str) -> List[SystemPr
if ddl.table_name == table_name
]
)
class RedshiftSystemMetricsComputer(SystemMetricsComputer, RedshiftSystemMetricsSource):
pass

View File

@ -7,6 +7,7 @@ from typing import List, Optional, Tuple
import sqlalchemy.orm
from pydantic import TypeAdapter
from sqlalchemy.orm import Session
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.ingestion.source.database.snowflake.models import (
@ -16,10 +17,10 @@ from metadata.ingestion.source.database.snowflake.models import (
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
CacheProvider,
EmptySystemMetricsSource,
SQASessionProvider,
SystemMetricsComputer,
register_system_metrics,
)
from metadata.profiler.orm.registry import PythonDialects
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.collections import CaseInsensitiveString
from metadata.utils.logger import profiler_logger
@ -283,38 +284,30 @@ def get_snowflake_system_queries(
return None
class SnowflakeSystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider[SnowflakeQueryLogEntry]
@register_system_metrics(PythonDialects.Snowflake)
class SnowflakeSystemMetricsComputer(
SystemMetricsComputer, CacheProvider[SnowflakeQueryLogEntry]
):
"""Snowflake system metrics source"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, session: Session, runner: QueryRunner):
self.session = session
self.runner = runner
self.table = runner.table_name
self.database = runner.session.get_bind().url.database
self.schema = runner.schema_name
self.resolver = SnowflakeTableResovler(
session=super().get_session(),
session=session,
)
def get_kwargs(self, **kwargs):
runner: QueryRunner = kwargs.get("runner")
return {
"table": runner.table_name,
"database": runner.session.get_bind().url.database,
"schema": runner.schema_name,
}
def get_inserts(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
def get_inserts(self) -> List[SystemProfile]:
return self.get_system_profile(
database,
schema,
table,
self.database,
self.schema,
self.table,
list(
self.get_queries_by_operation(
table,
self.table,
[
DatabaseDMLOperations.INSERT,
DatabaseDMLOperations.MERGE,
@ -325,19 +318,14 @@ class SnowflakeSystemMetricsSource(
DmlOperationType.INSERT,
)
def get_updates(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
def get_updates(self) -> List[SystemProfile]:
return self.get_system_profile(
database,
schema,
table,
self.database,
self.schema,
self.table,
list(
self.get_queries_by_operation(
table,
self.table,
[
DatabaseDMLOperations.UPDATE,
DatabaseDMLOperations.MERGE,
@ -348,19 +336,14 @@ class SnowflakeSystemMetricsSource(
DmlOperationType.UPDATE,
)
def get_deletes(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
def get_deletes(self) -> List[SystemProfile]:
return self.get_system_profile(
database,
schema,
table,
self.database,
self.schema,
self.table,
list(
self.get_queries_by_operation(
table,
self.table,
[
DatabaseDMLOperations.DELETE,
],
@ -418,7 +401,7 @@ class SnowflakeSystemMetricsSource(
queries = self.get_or_update_cache(
table,
SnowflakeQueryLogEntry.get_for_table,
session=super().get_session(),
session=self.session,
tablename=table,
)
results = [
@ -429,9 +412,3 @@ class SnowflakeSystemMetricsSource(
for row in queries
]
return [result for result in results if result is not None]
class SnowflakeSystemMetricsComputer(
SystemMetricsComputer, SnowflakeSystemMetricsSource
):
pass

View File

@ -15,16 +15,20 @@ System Metric
from abc import ABC
from collections import defaultdict
from typing import Callable, Generic, List, TypeVar
from typing import Callable, Dict, Generic, List, Optional, Protocol, Type, TypeVar
from sqlalchemy import text
from sqlalchemy.orm import Session
from metadata.generated.schema.configuration.profilerConfiguration import MetricType
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.profiler.metrics.core import SystemMetric
from metadata.profiler.orm.registry import PythonDialects
from metadata.utils.helpers import deep_size_of_dict
from metadata.utils.importer import DynamicImportException, import_from_module
from metadata.utils.logger import profiler_logger
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
from metadata.utils.profiler_utils import QueryResult
logger = profiler_logger()
@ -45,7 +49,23 @@ class CacheProvider(ABC, Generic[T]):
"""Cache provider class to provide cache for system metrics"""
def __init__(self):
self.cache = LRUCache[T](LRU_CACHE_SIZE)
self.cache = LRUCache[List[T]](LRU_CACHE_SIZE)
def __init_subclass__(cls, **kwargs):
"""Ensure that subclasses properly initialize the cache"""
super().__init_subclass__(**kwargs)
# Store the original __init__ method
original_init = cls.__init__
def new_init(self, *args, **kwargs):
# Always call CacheProvider.__init__ first
CacheProvider.__init__(self)
# Then call the original __init__ if it exists and is not CacheProvider's
if original_init is not CacheProvider.__init__:
original_init(self, *args, **kwargs)
cls.__init__ = new_init
def get_or_update_cache(
self,
@ -53,64 +73,113 @@ class CacheProvider(ABC, Generic[T]):
get_queries_fn: Callable[..., List[T]],
*args,
**kwargs,
):
) -> List[T]:
if cache_path in self.cache:
return self.cache.get(cache_path)
cached_result = self.cache.get(cache_path)
return cached_result if cached_result is not None else []
result = get_queries_fn(*args, **kwargs)
self.cache.put(cache_path, result)
return result
class EmptySystemMetricsSource:
"""Empty system metrics source that can be used as a default. Just returns an empty list of system metrics
for any resource."""
class SystemMetricsComputer(Protocol):
"""System metrics computer class to fetch system metrics for a given table."""
def get_inserts(self, **kwargs) -> List[SystemProfile]:
"""Get insert queries"""
return []
def _get_query_results(
self,
session: Session,
query,
operation,
) -> List[QueryResult]:
"""get query results either from cache or from the database
def get_deletes(self, **kwargs) -> List[SystemProfile]:
"""Get delete queries"""
return []
Args:
session (Session): session
query (_type_): query
operation (_type_): operation
def get_updates(self, **kwargs) -> List[SystemProfile]:
"""Get update queries"""
return []
Returns:
List[QueryResult]:
"""
cursor = session.execute(text(query))
results = [
QueryResult(
database_name=row.database,
schema_name=row.schema,
table_name=row.table,
query_text=None,
query_type=operation,
start_time=row.starttime,
rows=row.rows,
)
for row in cursor
if (row.rows is not None) and (row.rows > 0)
]
def get_kwargs(self, **kwargs):
"""Get kwargs to be used in get_inserts, get_deletes, get_updates"""
return {}
return results
class SystemMetricsComputer(EmptySystemMetricsSource):
def __init__(self, *args, **kwargs):
# collaborative constructor that initalizes upstream classes
super().__init__(*args, **kwargs)
def get_system_metrics(self, **kwargs) -> List[SystemProfile]:
def get_system_metrics(self) -> List[SystemProfile]:
"""Return system metrics for a given table. Actual passed object can be a variety of types based
on the underlying infrastructure. For example, in the case of SQLalchemy, it can be a Table object
and in the case of Mongo, it can be a collection object."""
kwargs = super().get_kwargs(**kwargs)
return (
super().get_inserts(**kwargs)
+ super().get_deletes(**kwargs)
+ super().get_updates(**kwargs)
)
return self.get_inserts() + self.get_deletes() + self.get_updates()
def get_inserts(self) -> List[SystemProfile]:
"""Get insert queries"""
return []
def get_deletes(self) -> List[SystemProfile]:
"""Get delete queries"""
return []
def get_updates(self) -> List[SystemProfile]:
"""Get update queries"""
return []
class SQASessionProvider:
"""SQASessionProvider class to provide session to the system metrics"""
class SystemMetricsRegistry:
_registry: Dict[str, Type["SystemMetricsComputer"]] = {}
def __init__(self, *args, **kwargs):
self.session = kwargs.pop("session")
super().__init__(*args, **kwargs)
@classmethod
def register(cls, dialect: PythonDialects, implementation: Type):
cls._registry[dialect.name.lower()] = implementation
def get_session(self):
return self.session
@classmethod
def get(cls, dialect: PythonDialects) -> Optional[Type["SystemMetricsComputer"]]:
if dialect.name.lower() not in cls._registry:
cls._discover_implementation(dialect)
return cls._registry.get(dialect.name.lower())
def get_database(self) -> str:
return self.session.get_bind().url.database
@classmethod
def _discover_implementation(cls, dialect: PythonDialects):
"""Auto-discover the implementation in the profiler metrics"""
try:
implementation = import_from_module(
f"metadata.profiler.metrics.system.{dialect.name.lower()}.system"
)
except DynamicImportException:
logger.warning(f"No implementation found for {dialect.name.lower()}")
return
cls._registry[dialect.name.lower()] = implementation
def register_system_metrics(
dialect: PythonDialects,
) -> Callable[[Type["SystemMetricsComputer"]], Type["SystemMetricsComputer"]]:
"""Decorator to register a system metric implementation
Args:
dialect (PythonDialects): database type
Returns:
Callable: decorator function
"""
def decorator(cls: Type["SystemMetricsComputer"]):
SystemMetricsRegistry.register(dialect, cls)
return cls
return decorator
class System(SystemMetric):

View File

@ -73,13 +73,6 @@ class BigQuerySampler(SQASampler):
**kwargs,
)
self.raw_dataset_type: Optional[TableType] = entity.tableType
if self._table.__table__.schema and self.entity.database.name:
self._table.__table__.schema = (
f"{self.entity.database.name}.{self._table.__table__.schema}"
)
self._table.__table_args__[
"schema"
] = f"{self.entity.database.name}.{self._table.__table_args__['schema']}"
def set_tablesample(self, selectable: SqaTable):
"""Set the TABLESAMPLE clause for BigQuery

View File

@ -2,7 +2,7 @@ from unittest.mock import MagicMock, Mock
import pytest
from metadata.ingestion.source.database.snowflake.profiler.system import (
from metadata.profiler.metrics.system.snowflake.system import (
PUBLIC_SCHEMA,
SnowflakeTableResovler,
)

View File

@ -34,7 +34,6 @@ from metadata.profiler.interface.sqlalchemy.profiler_interface import (
)
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.metrics.system.system import SystemMetricsComputer
from metadata.profiler.orm.functions.sum import SumFn
from metadata.profiler.processor.core import Profiler
from metadata.sampler.sqlalchemy.sampler import SQASampler
@ -939,9 +938,6 @@ class MetricsTest(TestCase):
assert res == 61
def test_system_metric(self):
assert SystemMetricsComputer().get_system_metrics() == []
def test_table_custom_metric(self):
table_entity = Table(
id=uuid4(),

View File

@ -22,11 +22,11 @@ from sqlalchemy.orm import declarative_base
from sqlalchemy.sql.sqltypes import Integer, String
from metadata.ingestion.source.database.snowflake.models import SnowflakeQueryLogEntry
from metadata.ingestion.source.database.snowflake.profiler.system import (
from metadata.profiler.metrics.hybrid.histogram import Histogram
from metadata.profiler.metrics.system.snowflake.system import (
SnowflakeTableResovler,
get_snowflake_system_queries,
)
from metadata.profiler.metrics.hybrid.histogram import Histogram
from metadata.profiler.metrics.system.system import recursive_dic
from metadata.utils.profiler_utils import (
get_identifiers_from_string,