MINOR: fix system profile return types (#18470)

* fix(redshift-system): redshift return type

* fixed bigquery profiler

* fixed snowflake profiler

* job id action does not support matrix. using plain action summary.

* reverted gha change
This commit is contained in:
Imri Paran 2024-11-11 10:49:42 +01:00 committed by GitHub
parent fc79d60d83
commit a6d97b67a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 225 additions and 80 deletions

View File

@ -115,9 +115,98 @@ ignore-paths = [
"ingestion/src/metadata/great_expectations/action.py",
"ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py",
"ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py",
"ingestion/src/metadata/ingestion/source",
".*/src/metadata/ingestion/source/.*/service_spec.py",
"ingestion/src/metadata/profiler/metrics",
"ingestion/src/metadata/profiler/source/databricks",
# metadata ingestion sources
"ingestion/src/metadata/ingestion/source/api/rest/connection.py",
"ingestion/src/metadata/ingestion/source/api/rest/metadata.py",
"ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py",
"ingestion/src/metadata/ingestion/source/dashboard/metabase/models.py",
"ingestion/src/metadata/ingestion/source/dashboard/mode/client.py",
"ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py",
"ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/client.py",
"ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py",
"ingestion/src/metadata/ingestion/source/dashboard/qliksense/client.py",
"ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py",
"ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py",
"ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py",
"ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py",
"ingestion/src/metadata/ingestion/source/dashboard/superset/metadata.py",
"ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py",
"ingestion/src/metadata/ingestion/source/database/athena/metadata.py",
"ingestion/src/metadata/ingestion/source/database/athena/utils.py",
"ingestion/src/metadata/ingestion/source/database/azuresql/connection.py",
"ingestion/src/metadata/ingestion/source/database/bigquery/connection.py",
"ingestion/src/metadata/ingestion/source/database/bigquery/incremental_table_processor.py",
"ingestion/src/metadata/ingestion/source/database/bigquery/queries.py",
"ingestion/src/metadata/ingestion/source/database/common_db_source.py",
"ingestion/src/metadata/ingestion/source/database/couchbase/metadata.py",
"ingestion/src/metadata/ingestion/source/database/databricks/client.py",
"ingestion/src/metadata/ingestion/source/database/databricks/metadata.py",
"ingestion/src/metadata/ingestion/source/database/datalake/clients/azure_blob.py",
"ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py",
"ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py",
"ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py",
"ingestion/src/metadata/ingestion/source/database/datalake/connection.py",
"ingestion/src/metadata/ingestion/source/database/datalake/metadata.py",
"ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py",
"ingestion/src/metadata/ingestion/source/database/deltalake/clients/base.py",
"ingestion/src/metadata/ingestion/source/database/deltalake/clients/pyspark.py",
"ingestion/src/metadata/ingestion/source/database/deltalake/clients/s3.py",
"ingestion/src/metadata/ingestion/source/database/deltalake/connection.py",
"ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py",
"ingestion/src/metadata/ingestion/source/database/doris/utils.py",
"ingestion/src/metadata/ingestion/source/database/exasol/metadata.py",
"ingestion/src/metadata/ingestion/source/database/exasol/connection.py",
"ingestion/src/metadata/ingestion/source/database/external_table_lineage_mixin.py",
"ingestion/src/metadata/ingestion/source/database/hive/metadata.py",
"ingestion/src/metadata/ingestion/source/database/lineage_source.py",
"ingestion/src/metadata/ingestion/source/database/mssql/lineage.py",
"ingestion/src/metadata/ingestion/source/database/mssql/usage.py",
"ingestion/src/metadata/ingestion/source/database/mssql/utils.py",
"ingestion/src/metadata/ingestion/source/database/mysql/connection.py",
"ingestion/src/metadata/ingestion/source/database/oracle/lineage.py",
"ingestion/src/metadata/ingestion/source/database/pinotdb/metadata.py",
"ingestion/src/metadata/ingestion/source/database/postgres/connection.py",
"ingestion/src/metadata/ingestion/source/database/postgres/converter_orm.py",
"ingestion/src/metadata/ingestion/source/database/postgres/lineage.py",
"ingestion/src/metadata/ingestion/source/database/postgres/metadata.py",
"ingestion/src/metadata/ingestion/source/database/postgres/metrics.py",
"ingestion/src/metadata/ingestion/source/database/postgres/types/money.py",
"ingestion/src/metadata/ingestion/source/database/postgres/utils.py",
"ingestion/src/metadata/ingestion/source/database/redshift/incremental_table_processor.py",
"ingestion/src/metadata/ingestion/source/database/redshift/lineage.py",
"ingestion/src/metadata/ingestion/source/database/redshift/models.py",
"ingestion/src/metadata/ingestion/source/database/sample_data.py",
"ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py",
"ingestion/src/metadata/ingestion/source/database/saphana/metadata.py",
"ingestion/src/metadata/ingestion/source/database/sas/client.py",
"ingestion/src/metadata/ingestion/source/database/sas/metadata.py",
"ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py",
"ingestion/src/metadata/ingestion/source/database/snowflake/utils.py",
"ingestion/src/metadata/ingestion/source/database/sql_column_handler.py",
"ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py",
"ingestion/src/metadata/ingestion/source/database/teradata/connection.py",
"ingestion/src/metadata/ingestion/source/database/trino/connection.py",
"ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py",
"ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py",
"ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/domopipeline/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py",
"ingestion/src/metadata/ingestion/source/pipeline/flink/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py",
"ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py",
"ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py",
"ingestion/src/metadata/ingestion/source/sqa_types.py",
"ingestion/src/metadata/ingestion/source/storage/gcs/connection.py",
"ingestion/src/metadata/ingestion/source/storage/gcs/metadata.py",
"ingestion/src/metadata/ingestion/source/storage/s3/metadata.py",
]
[tool.pylint."MESSAGES CONTROL"]

View File

@ -1,3 +1,5 @@
"""BigqQuery Profiler"""
from typing import List, Type
from metadata.generated.schema.entity.data.table import SystemProfile
@ -20,10 +22,9 @@ class BigQueryProfiler(BigQueryProfilerInterface):
**kwargs,
) -> List[SystemProfile]:
return self.system_metrics_computer.get_system_metrics(
runner.table, self.service_connection_config
table=runner.table,
usage_location=self.service_connection_config.usageLocation,
)
def initialize_system_metrics_computer(
self, **kwargs
) -> BigQuerySystemMetricsComputer:
def initialize_system_metrics_computer(self) -> BigQuerySystemMetricsComputer:
return BigQuerySystemMetricsComputer(session=self.session)

View File

@ -1,12 +1,10 @@
"""BigQuery system metric source"""
from typing import List
from pydantic import TypeAdapter
from sqlalchemy.orm import DeclarativeMeta
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult
from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations
from metadata.profiler.metrics.system.system import (
@ -24,26 +22,24 @@ logger = profiler_logger()
class BigQuerySystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
"""BigQuery system metrics source class"""
def get_kwargs(
self,
table: DeclarativeMeta,
service_connection: BigQueryConnection,
*args,
**kwargs,
):
def get_kwargs(self, **kwargs):
table = kwargs.get("table")
return {
"table": table.__table__.name,
"dataset_id": table.__table_args__["schema"],
"project_id": super().get_session().get_bind().url.host,
"usage_location": service_connection.usageLocation,
"usage_location": kwargs.get("usage_location"),
}
def get_deletes(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
def get_deletes(self, **kwargs) -> List[SystemProfile]:
table, project_id, usage_location, dataset_id = (
kwargs.get("table"),
kwargs.get("project_id"),
kwargs.get("usage_location"),
kwargs.get("dataset_id"),
)
return self.get_system_profile(
project_id,
dataset_id,
@ -62,9 +58,13 @@ class BigQuerySystemMetricsSource(
DmlOperationType.DELETE,
)
def get_updates(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
def get_updates(self, **kwargs) -> List[SystemProfile]:
table, project_id, usage_location, dataset_id = (
kwargs.get("table"),
kwargs.get("project_id"),
kwargs.get("usage_location"),
kwargs.get("dataset_id"),
)
return self.get_system_profile(
project_id,
dataset_id,
@ -82,9 +82,13 @@ class BigQuerySystemMetricsSource(
DmlOperationType.UPDATE,
)
def get_inserts(
self, table: str, project_id: str, usage_location: str, dataset_id: str
) -> List[SystemProfile]:
def get_inserts(self, **kwargs) -> List[SystemProfile]:
table, project_id, usage_location, dataset_id = (
kwargs.get("table"),
kwargs.get("project_id"),
kwargs.get("usage_location"),
kwargs.get("dataset_id"),
)
return self.get_system_profile(
project_id,
dataset_id,

View File

@ -1,3 +1,5 @@
"""Redshift profiler"""
from metadata.ingestion.source.database.redshift.profiler.system import (
RedshiftSystemMetricsComputer,
)
@ -8,5 +10,5 @@ from metadata.profiler.metrics.system.system import SystemMetricsComputer
class RedshiftProfiler(SQAProfilerInterface):
def initialize_system_metrics_computer(self, **kwargs) -> SystemMetricsComputer:
def initialize_system_metrics_computer(self) -> SystemMetricsComputer:
return RedshiftSystemMetricsComputer(session=self.session)

View File

@ -1,6 +1,10 @@
"""
Imeplemetation for the redshift system metrics source
"""
from typing import List
from sqlalchemy.orm import DeclarativeMeta
from pydantic import TypeAdapter
from metadata.generated.schema.entity.data.table import SystemProfile
from metadata.ingestion.source.database.redshift.queries import (
@ -24,13 +28,14 @@ logger = profiler_logger()
class RedshiftSystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider
):
def __init__(self, *args, **kwargs):
# collaborative constructor that initalizes the SQASessionProvider and CacheProvider
super().__init__(*args, **kwargs)
"""Redshift system metrics source class"""
def get_inserts(
self, database: str, schema: str, table: str
) -> List[SystemProfile]:
def get_inserts(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
queries = self.get_or_update_cache(
f"{database}.{schema}",
self._get_insert_queries,
@ -39,16 +44,20 @@ class RedshiftSystemMetricsSource(
)
return get_metric_result(queries, table)
def get_kwargs(self, table: DeclarativeMeta, *args, **kwargs):
def get_kwargs(self, **kwargs):
table = kwargs.get("table")
return {
"table": table.__table__.name,
"database": self.get_session().get_bind().url.database,
"schema": table.__table__.schema,
}
def get_deletes(
self, database: str, schema: str, table: str
) -> List[SystemProfile]:
def get_deletes(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
queries = self.get_or_update_cache(
f"{database}.{schema}",
self._get_delete_queries,
@ -57,9 +66,10 @@ class RedshiftSystemMetricsSource(
)
return get_metric_result(queries, table)
def get_updates(
self, database: str, schema: str, table: str
) -> List[SystemProfile]:
def get_updates(self, **kwargs) -> List[SystemProfile]:
database = kwargs.get("database")
schema = kwargs.get("schema")
table = kwargs.get("table")
queries = self.get_or_update_cache(
f"{database}.{schema}",
self._get_update_queries,
@ -111,7 +121,7 @@ class RedshiftSystemMetricsSource(
)
def get_metric_result(ddls: List[QueryResult], table_name: str) -> List:
def get_metric_result(ddls: List[QueryResult], table_name: str) -> List[SystemProfile]:
"""Given query results, retur the metric result
Args:
@ -121,15 +131,17 @@ def get_metric_result(ddls: List[QueryResult], table_name: str) -> List:
Returns:
List:
"""
return [
{
"timestamp": datetime_to_timestamp(ddl.start_time, milliseconds=True),
"operation": ddl.query_type,
"rowsAffected": ddl.rows,
}
for ddl in ddls
if ddl.table_name == table_name
]
return TypeAdapter(List[SystemProfile]).validate_python(
[
{
"timestamp": datetime_to_timestamp(ddl.start_time, milliseconds=True),
"operation": ddl.query_type,
"rowsAffected": ddl.rows,
}
for ddl in ddls
if ddl.table_name == table_name
]
)
class RedshiftSystemMetricsComputer(SystemMetricsComputer, RedshiftSystemMetricsSource):

View File

@ -22,5 +22,5 @@ from metadata.profiler.metrics.system.system import SystemMetricsComputer
class SnowflakeProfiler(SnowflakeProfilerInterface):
def initialize_system_metrics_computer(self, **kwargs) -> SystemMetricsComputer:
def initialize_system_metrics_computer(self) -> SystemMetricsComputer:
return SnowflakeSystemMetricsComputer(session=self.session)

View File

@ -1,3 +1,5 @@
"""Snowflake system metrics source"""
import hashlib
import re
import traceback
@ -5,7 +7,6 @@ from typing import List, Optional, Tuple
import sqlalchemy.orm
from pydantic import TypeAdapter
from sqlalchemy.orm import DeclarativeMeta
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.ingestion.source.database.snowflake.models import (
@ -69,6 +70,22 @@ def _parse_query(query: str) -> Optional[str]:
class SnowflakeTableResovler:
"""A class the resolves snowflake tables by mimicking snowflake's default resolution logic:
https://docs.snowflake.com/en/sql-reference/name-resolution
This default specification searches in the following order:
- The explicitly provided schema
- The current schema
- The public schema
This can be altered by changing the SEARCH_PATH session parameter. If the users change
this paramter, this resolver will might return wrong values.
There is no way to extract the SEARCH_PATH from the query after it has been executed. Hence, we can
only rely on the default behavior and maybe allow the users to configure the search path
at the connection level (TODO).
"""
def __init__(self, session: sqlalchemy.orm.Session):
self._cache = LRUCache[bool](LRU_CACHE_SIZE)
self.session = session
@ -116,7 +133,8 @@ class SnowflakeTableResovler:
Returns:
tuple: Tuple of database, schema and table names
Raises:
RuntimeError: If the table is not found in the metadata or if there are duplicate results (there shouldn't be)
RuntimeError: If the table is not found in the metadata or if there are duplicate results
(there shouldn't be)
"""
search_paths = []
@ -131,7 +149,7 @@ class SnowflakeTableResovler:
search_paths += ".".join([context_database, PUBLIC_SCHEMA, table_name])
return context_database, PUBLIC_SCHEMA, table_name
raise RuntimeError(
"Could not find the table {search_paths}.".format(
"Could not find the table {search_paths}.".format( # pylint: disable=consider-using-f-string
search_paths=" OR ".join(map(lambda x: f"[{x}]", search_paths))
)
)
@ -154,7 +172,8 @@ class SnowflakeTableResovler:
Args:
context_database (str): Database name from the query context
context_schema (Optional[str]): Schema name from the query context
identifier (str): Identifier string extracted from a query (can be 'db.schema.table', 'schema.table' or just 'table')
identifier (str): Identifier string extracted from a query (can be
'db.schema.table', 'schema.table' or just 'table')
Returns:
Tuple[Optional[str], Optional[str], Optional[str]]: Tuple of database, schema and table names
Raises:
@ -266,22 +285,28 @@ def get_snowflake_system_queries(
class SnowflakeSystemMetricsSource(
SQASessionProvider, EmptySystemMetricsSource, CacheProvider[SnowflakeQueryLogEntry]
):
"""Snowflake system metrics source"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.resolver = SnowflakeTableResovler(
session=super().get_session(),
)
def get_kwargs(self, table: DeclarativeMeta, *args, **kwargs):
def get_kwargs(self, **kwargs):
table = kwargs.get("table")
return {
"table": table.__table__.name,
"database": self.get_session().get_bind().url.database,
"schema": table.__table__.schema,
}
def get_inserts(
self, database: str, schema: str, table: str
) -> List[SystemProfile]:
def get_inserts(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
return self.get_system_profile(
database,
schema,
@ -299,9 +324,12 @@ class SnowflakeSystemMetricsSource(
DmlOperationType.INSERT,
)
def get_updates(
self, database: str, schema: str, table: str
) -> List[SystemProfile]:
def get_updates(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
return self.get_system_profile(
database,
schema,
@ -319,9 +347,12 @@ class SnowflakeSystemMetricsSource(
DmlOperationType.UPDATE,
)
def get_deletes(
self, database: str, schema: str, table: str
) -> List[SystemProfile]:
def get_deletes(self, **kwargs) -> List[SystemProfile]:
database, schema, table = (
kwargs.get("database"),
kwargs.get("schema"),
kwargs.get("table"),
)
return self.get_system_profile(
database,
schema,

View File

@ -391,7 +391,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
dictionnary of results
"""
logger.debug(f"Computing system metrics for {runner.table.__tablename__}")
return self.system_metrics_computer.get_system_metrics(runner.table)
return self.system_metrics_computer.get_system_metrics(table=runner.table)
def _create_thread_safe_sampler(
self,

View File

@ -42,6 +42,8 @@ T = TypeVar("T")
class CacheProvider(ABC, Generic[T]):
"""Cache provider class to provide cache for system metrics"""
def __init__(self):
self.cache = LRUCache[T](LRU_CACHE_SIZE)
@ -63,19 +65,20 @@ class EmptySystemMetricsSource:
"""Empty system metrics source that can be used as a default. Just returns an empty list of system metrics
for any resource."""
def get_inserts(self, *args, **kwargs) -> List[SystemProfile]:
def get_inserts(self, **kwargs) -> List[SystemProfile]:
"""Get insert queries"""
return []
def get_deletes(self, *args, **kwargs) -> List[SystemProfile]:
def get_deletes(self, **kwargs) -> List[SystemProfile]:
"""Get delete queries"""
return []
def get_updates(self, *args, **kwargs) -> List[SystemProfile]:
def get_updates(self, **kwargs) -> List[SystemProfile]:
"""Get update queries"""
return []
def get_kwargs(self, *args, **kwargs):
def get_kwargs(self, **kwargs):
"""Get kwargs to be used in get_inserts, get_deletes, get_updates"""
return {}
@ -84,11 +87,11 @@ class SystemMetricsComputer(EmptySystemMetricsSource):
# collaborative constructor that initalizes upstream classes
super().__init__(*args, **kwargs)
def get_system_metrics(self, *args, **kwargs) -> List[SystemProfile]:
def get_system_metrics(self, **kwargs) -> List[SystemProfile]:
"""Return system metrics for a given table. Actual passed object can be a variety of types based
on the underlying infrastructure. For example, in the case of SQLalchemy, it can be a Table object
and in the case of Mongo, it can be a collection object."""
kwargs = super().get_kwargs(*args, **kwargs)
kwargs = super().get_kwargs(**kwargs)
return (
super().get_inserts(**kwargs)
+ super().get_deletes(**kwargs)
@ -97,6 +100,8 @@ class SystemMetricsComputer(EmptySystemMetricsSource):
class SQASessionProvider:
"""SQASessionProvider class to provide session to the system metrics"""
def __init__(self, *args, **kwargs):
self.session = kwargs.pop("session")
super().__init__(*args, **kwargs)

View File

@ -429,11 +429,12 @@ class CliDBBase(TestCase):
profile_type=SystemProfile,
).entities
actual_profiles = sorted(
actual_profiles, key=lambda x: (x.timestamp.root, x.operation.value)
actual_profiles,
key=lambda x: (-x.timestamp.root, x.operation.value),
)
expected_profile = sorted(
expected_profile,
key=lambda x: (x.timestamp.root, x.operation.value),
key=lambda x: (-x.timestamp.root, x.operation.value),
)
assert len(actual_profiles) >= len(expected_profile)
for expected, actual in zip(expected_profile, actual_profiles):

View File

@ -268,17 +268,17 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
timestamp=Timestamp(root=1),
operation=DmlOperationType.INSERT,
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
timestamp=Timestamp(root=2),
operation=DmlOperationType.UPDATE,
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
timestamp=Timestamp(root=3),
operation=DmlOperationType.DELETE,
rowsAffected=1,
),