diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index 73cdaa78472..a0b6ab739ec 100644 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -115,9 +115,98 @@ ignore-paths = [ "ingestion/src/metadata/great_expectations/action.py", "ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py", "ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py", - "ingestion/src/metadata/ingestion/source", + ".*/src/metadata/ingestion/source/.*/service_spec.py", "ingestion/src/metadata/profiler/metrics", "ingestion/src/metadata/profiler/source/databricks", + + # metadata ingestion sources + "ingestion/src/metadata/ingestion/source/api/rest/connection.py", + "ingestion/src/metadata/ingestion/source/api/rest/metadata.py", + "ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py", + "ingestion/src/metadata/ingestion/source/dashboard/metabase/models.py", + "ingestion/src/metadata/ingestion/source/dashboard/mode/client.py", + "ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py", + "ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/client.py", + "ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py", + "ingestion/src/metadata/ingestion/source/dashboard/qliksense/client.py", + "ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py", + "ingestion/src/metadata/ingestion/source/dashboard/quicksight/metadata.py", + "ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py", + "ingestion/src/metadata/ingestion/source/dashboard/superset/api_source.py", + "ingestion/src/metadata/ingestion/source/dashboard/superset/metadata.py", + "ingestion/src/metadata/ingestion/source/dashboard/tableau/metadata.py", + "ingestion/src/metadata/ingestion/source/database/athena/metadata.py", + "ingestion/src/metadata/ingestion/source/database/athena/utils.py", + "ingestion/src/metadata/ingestion/source/database/azuresql/connection.py", + "ingestion/src/metadata/ingestion/source/database/bigquery/connection.py", + "ingestion/src/metadata/ingestion/source/database/bigquery/incremental_table_processor.py", + "ingestion/src/metadata/ingestion/source/database/bigquery/queries.py", + "ingestion/src/metadata/ingestion/source/database/common_db_source.py", + "ingestion/src/metadata/ingestion/source/database/couchbase/metadata.py", + "ingestion/src/metadata/ingestion/source/database/databricks/client.py", + "ingestion/src/metadata/ingestion/source/database/databricks/metadata.py", + "ingestion/src/metadata/ingestion/source/database/datalake/clients/azure_blob.py", + "ingestion/src/metadata/ingestion/source/database/datalake/clients/base.py", + "ingestion/src/metadata/ingestion/source/database/datalake/clients/gcs.py", + "ingestion/src/metadata/ingestion/source/database/datalake/clients/s3.py", + "ingestion/src/metadata/ingestion/source/database/datalake/connection.py", + "ingestion/src/metadata/ingestion/source/database/datalake/metadata.py", + "ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py", + "ingestion/src/metadata/ingestion/source/database/deltalake/clients/base.py", + "ingestion/src/metadata/ingestion/source/database/deltalake/clients/pyspark.py", + "ingestion/src/metadata/ingestion/source/database/deltalake/clients/s3.py", + "ingestion/src/metadata/ingestion/source/database/deltalake/connection.py", + "ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py", + "ingestion/src/metadata/ingestion/source/database/doris/utils.py", + "ingestion/src/metadata/ingestion/source/database/exasol/metadata.py", + "ingestion/src/metadata/ingestion/source/database/exasol/connection.py", + "ingestion/src/metadata/ingestion/source/database/external_table_lineage_mixin.py", + "ingestion/src/metadata/ingestion/source/database/hive/metadata.py", + "ingestion/src/metadata/ingestion/source/database/lineage_source.py", + "ingestion/src/metadata/ingestion/source/database/mssql/lineage.py", + "ingestion/src/metadata/ingestion/source/database/mssql/usage.py", + "ingestion/src/metadata/ingestion/source/database/mssql/utils.py", + "ingestion/src/metadata/ingestion/source/database/mysql/connection.py", + "ingestion/src/metadata/ingestion/source/database/oracle/lineage.py", + "ingestion/src/metadata/ingestion/source/database/pinotdb/metadata.py", + "ingestion/src/metadata/ingestion/source/database/postgres/connection.py", + "ingestion/src/metadata/ingestion/source/database/postgres/converter_orm.py", + "ingestion/src/metadata/ingestion/source/database/postgres/lineage.py", + "ingestion/src/metadata/ingestion/source/database/postgres/metadata.py", + "ingestion/src/metadata/ingestion/source/database/postgres/metrics.py", + "ingestion/src/metadata/ingestion/source/database/postgres/types/money.py", + "ingestion/src/metadata/ingestion/source/database/postgres/utils.py", + "ingestion/src/metadata/ingestion/source/database/redshift/incremental_table_processor.py", + "ingestion/src/metadata/ingestion/source/database/redshift/lineage.py", + "ingestion/src/metadata/ingestion/source/database/redshift/models.py", + "ingestion/src/metadata/ingestion/source/database/sample_data.py", + "ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py", + "ingestion/src/metadata/ingestion/source/database/saphana/metadata.py", + "ingestion/src/metadata/ingestion/source/database/sas/client.py", + "ingestion/src/metadata/ingestion/source/database/sas/metadata.py", + "ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py", + "ingestion/src/metadata/ingestion/source/database/snowflake/utils.py", + "ingestion/src/metadata/ingestion/source/database/sql_column_handler.py", + "ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py", + "ingestion/src/metadata/ingestion/source/database/teradata/connection.py", + "ingestion/src/metadata/ingestion/source/database/trino/connection.py", + "ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py", + "ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py", + "ingestion/src/metadata/ingestion/source/messaging/kafka/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/domopipeline/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py", + "ingestion/src/metadata/ingestion/source/pipeline/flink/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py", + "ingestion/src/metadata/ingestion/source/pipeline/openlineage/utils.py", + "ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py", + "ingestion/src/metadata/ingestion/source/sqa_types.py", + "ingestion/src/metadata/ingestion/source/storage/gcs/connection.py", + "ingestion/src/metadata/ingestion/source/storage/gcs/metadata.py", + "ingestion/src/metadata/ingestion/source/storage/s3/metadata.py", ] [tool.pylint."MESSAGES CONTROL"] diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py index cceeafe3ea6..f85810bfd13 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py @@ -1,3 +1,5 @@ +"""BigqQuery Profiler""" + from typing import List, Type from metadata.generated.schema.entity.data.table import SystemProfile @@ -20,10 +22,9 @@ class BigQueryProfiler(BigQueryProfilerInterface): **kwargs, ) -> List[SystemProfile]: return self.system_metrics_computer.get_system_metrics( - runner.table, self.service_connection_config + table=runner.table, + usage_location=self.service_connection_config.usageLocation, ) - def initialize_system_metrics_computer( - self, **kwargs - ) -> BigQuerySystemMetricsComputer: + def initialize_system_metrics_computer(self) -> BigQuerySystemMetricsComputer: return BigQuerySystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py index 7aa6d787be8..2910fd060c7 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/system.py @@ -1,12 +1,10 @@ +"""BigQuery system metric source""" + from typing import List from pydantic import TypeAdapter -from sqlalchemy.orm import DeclarativeMeta from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile -from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( - BigQueryConnection, -) from metadata.ingestion.source.database.bigquery.queries import BigQueryQueryResult from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations from metadata.profiler.metrics.system.system import ( @@ -24,26 +22,24 @@ logger = profiler_logger() class BigQuerySystemMetricsSource( SQASessionProvider, EmptySystemMetricsSource, CacheProvider ): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + """BigQuery system metrics source class""" - def get_kwargs( - self, - table: DeclarativeMeta, - service_connection: BigQueryConnection, - *args, - **kwargs, - ): + def get_kwargs(self, **kwargs): + table = kwargs.get("table") return { "table": table.__table__.name, "dataset_id": table.__table_args__["schema"], "project_id": super().get_session().get_bind().url.host, - "usage_location": service_connection.usageLocation, + "usage_location": kwargs.get("usage_location"), } - def get_deletes( - self, table: str, project_id: str, usage_location: str, dataset_id: str - ) -> List[SystemProfile]: + def get_deletes(self, **kwargs) -> List[SystemProfile]: + table, project_id, usage_location, dataset_id = ( + kwargs.get("table"), + kwargs.get("project_id"), + kwargs.get("usage_location"), + kwargs.get("dataset_id"), + ) return self.get_system_profile( project_id, dataset_id, @@ -62,9 +58,13 @@ class BigQuerySystemMetricsSource( DmlOperationType.DELETE, ) - def get_updates( - self, table: str, project_id: str, usage_location: str, dataset_id: str - ) -> List[SystemProfile]: + def get_updates(self, **kwargs) -> List[SystemProfile]: + table, project_id, usage_location, dataset_id = ( + kwargs.get("table"), + kwargs.get("project_id"), + kwargs.get("usage_location"), + kwargs.get("dataset_id"), + ) return self.get_system_profile( project_id, dataset_id, @@ -82,9 +82,13 @@ class BigQuerySystemMetricsSource( DmlOperationType.UPDATE, ) - def get_inserts( - self, table: str, project_id: str, usage_location: str, dataset_id: str - ) -> List[SystemProfile]: + def get_inserts(self, **kwargs) -> List[SystemProfile]: + table, project_id, usage_location, dataset_id = ( + kwargs.get("table"), + kwargs.get("project_id"), + kwargs.get("usage_location"), + kwargs.get("dataset_id"), + ) return self.get_system_profile( project_id, dataset_id, diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py index b64fcd2a11b..6eee6e184fd 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/profiler.py @@ -1,3 +1,5 @@ +"""Redshift profiler""" + from metadata.ingestion.source.database.redshift.profiler.system import ( RedshiftSystemMetricsComputer, ) @@ -8,5 +10,5 @@ from metadata.profiler.metrics.system.system import SystemMetricsComputer class RedshiftProfiler(SQAProfilerInterface): - def initialize_system_metrics_computer(self, **kwargs) -> SystemMetricsComputer: + def initialize_system_metrics_computer(self) -> SystemMetricsComputer: return RedshiftSystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py index 148178722a6..c5189c7a258 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/profiler/system.py @@ -1,6 +1,10 @@ +""" +Imeplemetation for the redshift system metrics source +""" + from typing import List -from sqlalchemy.orm import DeclarativeMeta +from pydantic import TypeAdapter from metadata.generated.schema.entity.data.table import SystemProfile from metadata.ingestion.source.database.redshift.queries import ( @@ -24,13 +28,14 @@ logger = profiler_logger() class RedshiftSystemMetricsSource( SQASessionProvider, EmptySystemMetricsSource, CacheProvider ): - def __init__(self, *args, **kwargs): - # collaborative constructor that initalizes the SQASessionProvider and CacheProvider - super().__init__(*args, **kwargs) + """Redshift system metrics source class""" - def get_inserts( - self, database: str, schema: str, table: str - ) -> List[SystemProfile]: + def get_inserts(self, **kwargs) -> List[SystemProfile]: + database, schema, table = ( + kwargs.get("database"), + kwargs.get("schema"), + kwargs.get("table"), + ) queries = self.get_or_update_cache( f"{database}.{schema}", self._get_insert_queries, @@ -39,16 +44,20 @@ class RedshiftSystemMetricsSource( ) return get_metric_result(queries, table) - def get_kwargs(self, table: DeclarativeMeta, *args, **kwargs): + def get_kwargs(self, **kwargs): + table = kwargs.get("table") return { "table": table.__table__.name, "database": self.get_session().get_bind().url.database, "schema": table.__table__.schema, } - def get_deletes( - self, database: str, schema: str, table: str - ) -> List[SystemProfile]: + def get_deletes(self, **kwargs) -> List[SystemProfile]: + database, schema, table = ( + kwargs.get("database"), + kwargs.get("schema"), + kwargs.get("table"), + ) queries = self.get_or_update_cache( f"{database}.{schema}", self._get_delete_queries, @@ -57,9 +66,10 @@ class RedshiftSystemMetricsSource( ) return get_metric_result(queries, table) - def get_updates( - self, database: str, schema: str, table: str - ) -> List[SystemProfile]: + def get_updates(self, **kwargs) -> List[SystemProfile]: + database = kwargs.get("database") + schema = kwargs.get("schema") + table = kwargs.get("table") queries = self.get_or_update_cache( f"{database}.{schema}", self._get_update_queries, @@ -111,7 +121,7 @@ class RedshiftSystemMetricsSource( ) -def get_metric_result(ddls: List[QueryResult], table_name: str) -> List: +def get_metric_result(ddls: List[QueryResult], table_name: str) -> List[SystemProfile]: """Given query results, retur the metric result Args: @@ -121,15 +131,17 @@ def get_metric_result(ddls: List[QueryResult], table_name: str) -> List: Returns: List: """ - return [ - { - "timestamp": datetime_to_timestamp(ddl.start_time, milliseconds=True), - "operation": ddl.query_type, - "rowsAffected": ddl.rows, - } - for ddl in ddls - if ddl.table_name == table_name - ] + return TypeAdapter(List[SystemProfile]).validate_python( + [ + { + "timestamp": datetime_to_timestamp(ddl.start_time, milliseconds=True), + "operation": ddl.query_type, + "rowsAffected": ddl.rows, + } + for ddl in ddls + if ddl.table_name == table_name + ] + ) class RedshiftSystemMetricsComputer(SystemMetricsComputer, RedshiftSystemMetricsSource): diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py index f68f18f85c1..a262b18f52a 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/profiler.py @@ -22,5 +22,5 @@ from metadata.profiler.metrics.system.system import SystemMetricsComputer class SnowflakeProfiler(SnowflakeProfilerInterface): - def initialize_system_metrics_computer(self, **kwargs) -> SystemMetricsComputer: + def initialize_system_metrics_computer(self) -> SystemMetricsComputer: return SnowflakeSystemMetricsComputer(session=self.session) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py index c986ac0beed..89d38131fc2 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/profiler/system.py @@ -1,3 +1,5 @@ +"""Snowflake system metrics source""" + import hashlib import re import traceback @@ -5,7 +7,6 @@ from typing import List, Optional, Tuple import sqlalchemy.orm from pydantic import TypeAdapter -from sqlalchemy.orm import DeclarativeMeta from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile from metadata.ingestion.source.database.snowflake.models import ( @@ -69,6 +70,22 @@ def _parse_query(query: str) -> Optional[str]: class SnowflakeTableResovler: + """A class the resolves snowflake tables by mimicking snowflake's default resolution logic: + https://docs.snowflake.com/en/sql-reference/name-resolution + + This default specification searches in the following order: + - The explicitly provided schema + - The current schema + - The public schema + + This can be altered by changing the SEARCH_PATH session parameter. If the users change + this paramter, this resolver will might return wrong values. + + There is no way to extract the SEARCH_PATH from the query after it has been executed. Hence, we can + only rely on the default behavior and maybe allow the users to configure the search path + at the connection level (TODO). + """ + def __init__(self, session: sqlalchemy.orm.Session): self._cache = LRUCache[bool](LRU_CACHE_SIZE) self.session = session @@ -116,7 +133,8 @@ class SnowflakeTableResovler: Returns: tuple: Tuple of database, schema and table names Raises: - RuntimeError: If the table is not found in the metadata or if there are duplicate results (there shouldn't be) + RuntimeError: If the table is not found in the metadata or if there are duplicate results + (there shouldn't be) """ search_paths = [] @@ -131,7 +149,7 @@ class SnowflakeTableResovler: search_paths += ".".join([context_database, PUBLIC_SCHEMA, table_name]) return context_database, PUBLIC_SCHEMA, table_name raise RuntimeError( - "Could not find the table {search_paths}.".format( + "Could not find the table {search_paths}.".format( # pylint: disable=consider-using-f-string search_paths=" OR ".join(map(lambda x: f"[{x}]", search_paths)) ) ) @@ -154,7 +172,8 @@ class SnowflakeTableResovler: Args: context_database (str): Database name from the query context context_schema (Optional[str]): Schema name from the query context - identifier (str): Identifier string extracted from a query (can be 'db.schema.table', 'schema.table' or just 'table') + identifier (str): Identifier string extracted from a query (can be + 'db.schema.table', 'schema.table' or just 'table') Returns: Tuple[Optional[str], Optional[str], Optional[str]]: Tuple of database, schema and table names Raises: @@ -266,22 +285,28 @@ def get_snowflake_system_queries( class SnowflakeSystemMetricsSource( SQASessionProvider, EmptySystemMetricsSource, CacheProvider[SnowflakeQueryLogEntry] ): + """Snowflake system metrics source""" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.resolver = SnowflakeTableResovler( session=super().get_session(), ) - def get_kwargs(self, table: DeclarativeMeta, *args, **kwargs): + def get_kwargs(self, **kwargs): + table = kwargs.get("table") return { "table": table.__table__.name, "database": self.get_session().get_bind().url.database, "schema": table.__table__.schema, } - def get_inserts( - self, database: str, schema: str, table: str - ) -> List[SystemProfile]: + def get_inserts(self, **kwargs) -> List[SystemProfile]: + database, schema, table = ( + kwargs.get("database"), + kwargs.get("schema"), + kwargs.get("table"), + ) return self.get_system_profile( database, schema, @@ -299,9 +324,12 @@ class SnowflakeSystemMetricsSource( DmlOperationType.INSERT, ) - def get_updates( - self, database: str, schema: str, table: str - ) -> List[SystemProfile]: + def get_updates(self, **kwargs) -> List[SystemProfile]: + database, schema, table = ( + kwargs.get("database"), + kwargs.get("schema"), + kwargs.get("table"), + ) return self.get_system_profile( database, schema, @@ -319,9 +347,12 @@ class SnowflakeSystemMetricsSource( DmlOperationType.UPDATE, ) - def get_deletes( - self, database: str, schema: str, table: str - ) -> List[SystemProfile]: + def get_deletes(self, **kwargs) -> List[SystemProfile]: + database, schema, table = ( + kwargs.get("database"), + kwargs.get("schema"), + kwargs.get("table"), + ) return self.get_system_profile( database, schema, diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 695338a5112..1e82e23eecf 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -391,7 +391,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): dictionnary of results """ logger.debug(f"Computing system metrics for {runner.table.__tablename__}") - return self.system_metrics_computer.get_system_metrics(runner.table) + return self.system_metrics_computer.get_system_metrics(table=runner.table) def _create_thread_safe_sampler( self, diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index 7d07fc0c320..0bad0ed23d5 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -42,6 +42,8 @@ T = TypeVar("T") class CacheProvider(ABC, Generic[T]): + """Cache provider class to provide cache for system metrics""" + def __init__(self): self.cache = LRUCache[T](LRU_CACHE_SIZE) @@ -63,19 +65,20 @@ class EmptySystemMetricsSource: """Empty system metrics source that can be used as a default. Just returns an empty list of system metrics for any resource.""" - def get_inserts(self, *args, **kwargs) -> List[SystemProfile]: + def get_inserts(self, **kwargs) -> List[SystemProfile]: """Get insert queries""" return [] - def get_deletes(self, *args, **kwargs) -> List[SystemProfile]: + def get_deletes(self, **kwargs) -> List[SystemProfile]: """Get delete queries""" return [] - def get_updates(self, *args, **kwargs) -> List[SystemProfile]: + def get_updates(self, **kwargs) -> List[SystemProfile]: """Get update queries""" return [] - def get_kwargs(self, *args, **kwargs): + def get_kwargs(self, **kwargs): + """Get kwargs to be used in get_inserts, get_deletes, get_updates""" return {} @@ -84,11 +87,11 @@ class SystemMetricsComputer(EmptySystemMetricsSource): # collaborative constructor that initalizes upstream classes super().__init__(*args, **kwargs) - def get_system_metrics(self, *args, **kwargs) -> List[SystemProfile]: + def get_system_metrics(self, **kwargs) -> List[SystemProfile]: """Return system metrics for a given table. Actual passed object can be a variety of types based on the underlying infrastructure. For example, in the case of SQLalchemy, it can be a Table object and in the case of Mongo, it can be a collection object.""" - kwargs = super().get_kwargs(*args, **kwargs) + kwargs = super().get_kwargs(**kwargs) return ( super().get_inserts(**kwargs) + super().get_deletes(**kwargs) @@ -97,6 +100,8 @@ class SystemMetricsComputer(EmptySystemMetricsSource): class SQASessionProvider: + """SQASessionProvider class to provide session to the system metrics""" + def __init__(self, *args, **kwargs): self.session = kwargs.pop("session") super().__init__(*args, **kwargs) diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 43bd7247cf4..d746a2265ad 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -429,11 +429,12 @@ class CliDBBase(TestCase): profile_type=SystemProfile, ).entities actual_profiles = sorted( - actual_profiles, key=lambda x: (x.timestamp.root, x.operation.value) + actual_profiles, + key=lambda x: (-x.timestamp.root, x.operation.value), ) expected_profile = sorted( expected_profile, - key=lambda x: (x.timestamp.root, x.operation.value), + key=lambda x: (-x.timestamp.root, x.operation.value), ) assert len(actual_profiles) >= len(expected_profile) for expected, actual in zip(expected_profile, actual_profiles): diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index 7258882878d..a6eb1896e23 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -268,17 +268,17 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): rowsAffected=1, ), SystemProfile( - timestamp=Timestamp(root=0), + timestamp=Timestamp(root=1), operation=DmlOperationType.INSERT, rowsAffected=1, ), SystemProfile( - timestamp=Timestamp(root=0), + timestamp=Timestamp(root=2), operation=DmlOperationType.UPDATE, rowsAffected=1, ), SystemProfile( - timestamp=Timestamp(root=0), + timestamp=Timestamp(root=3), operation=DmlOperationType.DELETE, rowsAffected=1, ),