From e026d625d6540dc2ab8ee64355173dafad4679e4 Mon Sep 17 00:00:00 2001 From: Teddy Date: Tue, 28 Jun 2022 19:27:55 +0200 Subject: [PATCH] Fixes #5713 where profileSample default to 100 (#5714) * Fix random sample modulo and rand for postgres * Remove sampleProfile default and return declarative meta when no sample profile is passed * fixed py_format * isolated random sample partition logic into its own methid --- .../orm_profiler/orm/functions/modulo.py | 1 + .../orm_profiler/orm/functions/random_num.py | 6 ++++ .../orm_profiler/processor/orm_profiler.py | 2 +- .../metadata/orm_profiler/profiler/core.py | 2 +- .../metadata/orm_profiler/profiler/default.py | 2 +- .../metadata/orm_profiler/profiler/sampler.py | 32 ++++++++++++++++++- .../unit/profiler/test_session_validations.py | 10 ------ 7 files changed, 41 insertions(+), 14 deletions(-) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py index 97829414bcd..c4c212b568f 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py @@ -56,6 +56,7 @@ def _(element, compiler, **kw): @compiles(ModuloFn, Dialects.BigQuery) @compiles(ModuloFn, Dialects.Redshift) @compiles(ModuloFn, Dialects.Snowflake) +@compiles(ModuloFn, Dialects.Postgres) def _(element, compiler, **kw): value, base = validate_and_compile(element, compiler, **kw) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py b/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py index 26831be2f37..450b15a4c8d 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py @@ -77,3 +77,9 @@ def _(*_, **__): We need to divide it by 4294967295 to get a number between 0 and 1. """ return "toInt8(RAND(10)/4294967295*100)" + + +@compiles(RandomNumFn, Dialects.Postgres) +def _(*_, **__): + """Postgres random logic""" + return "ABS((RANDOM() * 100)::INTEGER)" diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index af789915427..bbfc50af8cf 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -132,7 +132,7 @@ class OrmProfilerProcessor(Processor[Table]): if my_record_tests and my_record_tests.profile_sample: return my_record_tests.profile_sample - return table.profileSample or 100.0 + return table.profileSample or None def get_partition_details( self, diff --git a/ingestion/src/metadata/orm_profiler/profiler/core.py b/ingestion/src/metadata/orm_profiler/profiler/core.py index 47e024a7da1..96d8e614cf1 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/core.py +++ b/ingestion/src/metadata/orm_profiler/profiler/core.py @@ -69,7 +69,7 @@ class Profiler(Generic[TMetric]): profile_date: datetime = datetime.now(), ignore_cols: Optional[List[str]] = None, use_cols: Optional[List[Column]] = None, - profile_sample: Optional[float] = 100.0, + profile_sample: Optional[float] = None, timeout_seconds: Optional[int] = TEN_MIN, partition_details: Optional[Dict] = None, profile_sample_query: Optional[str] = None, diff --git a/ingestion/src/metadata/orm_profiler/profiler/default.py b/ingestion/src/metadata/orm_profiler/profiler/default.py index c90e90c3192..d2cd3845e66 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/default.py +++ b/ingestion/src/metadata/orm_profiler/profiler/default.py @@ -62,7 +62,7 @@ class DefaultProfiler(Profiler): table: DeclarativeMeta, ignore_cols: Optional[List[str]] = None, profile_date: datetime = datetime.now(), - profile_sample: Optional[float] = 100.0, + profile_sample: Optional[float] = None, timeout_seconds: Optional[int] = TEN_MIN, partition_details: Optional[Dict] = None, profile_sample_query: Optional[str] = None, diff --git a/ingestion/src/metadata/orm_profiler/profiler/sampler.py b/ingestion/src/metadata/orm_profiler/profiler/sampler.py index d332228c181..e941374d4cf 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/sampler.py +++ b/ingestion/src/metadata/orm_profiler/profiler/sampler.py @@ -14,7 +14,7 @@ for the profiler """ from typing import Dict, Optional, Union -from sqlalchemy import inspect, text +from sqlalchemy import column, inspect, text from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased from sqlalchemy.orm.util import AliasedClass @@ -60,6 +60,12 @@ class Sampler: the full table if no sampling is required. """ + if not self.profile_sample: + if self._partition_details: + return self._random_sample_for_partitioned_tables() + + return self.table + if self._profile_sample_query: return self._fetch_sample_data_with_query_object() @@ -117,3 +123,27 @@ class Sampler: return self.session.query(self.table).from_statement( text(f"{self._profile_sample_query}") ) + + def _random_sample_for_partitioned_tables(self) -> Query: + """Return the Query object for partitioned tables""" + partition_field = self._partition_details["partition_field"] + if not self._partition_details.get("partition_values"): + sample = ( + self.session.query(self.table) + .filter( + column(partition_field) + >= self._partition_details["partition_start"].strftime("%Y-%m-%d"), + column(partition_field) + <= self._partition_details["partition_end"].strftime("%Y-%m-%d"), + ) + .subquery() + ) + return aliased(self.table, sample) + sample = ( + self.session.query(self.table) + .filter( + column(partition_field).in_(self._partition_details["partition_values"]) + ) + .subquery() + ) + return aliased(self.table, sample) diff --git a/ingestion/tests/unit/profiler/test_session_validations.py b/ingestion/tests/unit/profiler/test_session_validations.py index de433c21956..d38cc90cdb1 100644 --- a/ingestion/tests/unit/profiler/test_session_validations.py +++ b/ingestion/tests/unit/profiler/test_session_validations.py @@ -92,7 +92,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ok == TestCaseResult( @@ -107,7 +106,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ko == TestCaseResult( @@ -122,7 +120,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_aborted == TestCaseResult( @@ -146,7 +143,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ok == TestCaseResult( @@ -161,7 +157,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ko == TestCaseResult( @@ -176,7 +171,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_aborted == TestCaseResult( @@ -199,7 +193,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ok == TestCaseResult( @@ -217,7 +210,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ok_2 == TestCaseResult( @@ -234,7 +226,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_ko == TestCaseResult( @@ -251,7 +242,6 @@ class MetricsTest(TestCase): execution_date=EXECUTION_DATE, session=self.session, table=User, - profile_sample=100.0, ) assert res_aborted == TestCaseResult(