mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 13:36:32 +00:00
* Fix random sample modulo and rand for postgres * Remove sampleProfile default and return declarative meta when no sample profile is passed * fixed py_format * isolated random sample partition logic into its own methid
This commit is contained in:
parent
9414012c38
commit
e026d625d6
@ -56,6 +56,7 @@ def _(element, compiler, **kw):
|
||||
@compiles(ModuloFn, Dialects.BigQuery)
|
||||
@compiles(ModuloFn, Dialects.Redshift)
|
||||
@compiles(ModuloFn, Dialects.Snowflake)
|
||||
@compiles(ModuloFn, Dialects.Postgres)
|
||||
def _(element, compiler, **kw):
|
||||
|
||||
value, base = validate_and_compile(element, compiler, **kw)
|
||||
|
||||
@ -77,3 +77,9 @@ def _(*_, **__):
|
||||
We need to divide it by 4294967295 to get a number between 0 and 1.
|
||||
"""
|
||||
return "toInt8(RAND(10)/4294967295*100)"
|
||||
|
||||
|
||||
@compiles(RandomNumFn, Dialects.Postgres)
|
||||
def _(*_, **__):
|
||||
"""Postgres random logic"""
|
||||
return "ABS((RANDOM() * 100)::INTEGER)"
|
||||
|
||||
@ -132,7 +132,7 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
if my_record_tests and my_record_tests.profile_sample:
|
||||
return my_record_tests.profile_sample
|
||||
|
||||
return table.profileSample or 100.0
|
||||
return table.profileSample or None
|
||||
|
||||
def get_partition_details(
|
||||
self,
|
||||
|
||||
@ -69,7 +69,7 @@ class Profiler(Generic[TMetric]):
|
||||
profile_date: datetime = datetime.now(),
|
||||
ignore_cols: Optional[List[str]] = None,
|
||||
use_cols: Optional[List[Column]] = None,
|
||||
profile_sample: Optional[float] = 100.0,
|
||||
profile_sample: Optional[float] = None,
|
||||
timeout_seconds: Optional[int] = TEN_MIN,
|
||||
partition_details: Optional[Dict] = None,
|
||||
profile_sample_query: Optional[str] = None,
|
||||
|
||||
@ -62,7 +62,7 @@ class DefaultProfiler(Profiler):
|
||||
table: DeclarativeMeta,
|
||||
ignore_cols: Optional[List[str]] = None,
|
||||
profile_date: datetime = datetime.now(),
|
||||
profile_sample: Optional[float] = 100.0,
|
||||
profile_sample: Optional[float] = None,
|
||||
timeout_seconds: Optional[int] = TEN_MIN,
|
||||
partition_details: Optional[Dict] = None,
|
||||
profile_sample_query: Optional[str] = None,
|
||||
|
||||
@ -14,7 +14,7 @@ for the profiler
|
||||
"""
|
||||
from typing import Dict, Optional, Union
|
||||
|
||||
from sqlalchemy import inspect, text
|
||||
from sqlalchemy import column, inspect, text
|
||||
from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased
|
||||
from sqlalchemy.orm.util import AliasedClass
|
||||
|
||||
@ -60,6 +60,12 @@ class Sampler:
|
||||
the full table if no sampling is required.
|
||||
"""
|
||||
|
||||
if not self.profile_sample:
|
||||
if self._partition_details:
|
||||
return self._random_sample_for_partitioned_tables()
|
||||
|
||||
return self.table
|
||||
|
||||
if self._profile_sample_query:
|
||||
return self._fetch_sample_data_with_query_object()
|
||||
|
||||
@ -117,3 +123,27 @@ class Sampler:
|
||||
return self.session.query(self.table).from_statement(
|
||||
text(f"{self._profile_sample_query}")
|
||||
)
|
||||
|
||||
def _random_sample_for_partitioned_tables(self) -> Query:
|
||||
"""Return the Query object for partitioned tables"""
|
||||
partition_field = self._partition_details["partition_field"]
|
||||
if not self._partition_details.get("partition_values"):
|
||||
sample = (
|
||||
self.session.query(self.table)
|
||||
.filter(
|
||||
column(partition_field)
|
||||
>= self._partition_details["partition_start"].strftime("%Y-%m-%d"),
|
||||
column(partition_field)
|
||||
<= self._partition_details["partition_end"].strftime("%Y-%m-%d"),
|
||||
)
|
||||
.subquery()
|
||||
)
|
||||
return aliased(self.table, sample)
|
||||
sample = (
|
||||
self.session.query(self.table)
|
||||
.filter(
|
||||
column(partition_field).in_(self._partition_details["partition_values"])
|
||||
)
|
||||
.subquery()
|
||||
)
|
||||
return aliased(self.table, sample)
|
||||
|
||||
@ -92,7 +92,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ok == TestCaseResult(
|
||||
@ -107,7 +106,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ko == TestCaseResult(
|
||||
@ -122,7 +120,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_aborted == TestCaseResult(
|
||||
@ -146,7 +143,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ok == TestCaseResult(
|
||||
@ -161,7 +157,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ko == TestCaseResult(
|
||||
@ -176,7 +171,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_aborted == TestCaseResult(
|
||||
@ -199,7 +193,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ok == TestCaseResult(
|
||||
@ -217,7 +210,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ok_2 == TestCaseResult(
|
||||
@ -234,7 +226,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_ko == TestCaseResult(
|
||||
@ -251,7 +242,6 @@ class MetricsTest(TestCase):
|
||||
execution_date=EXECUTION_DATE,
|
||||
session=self.session,
|
||||
table=User,
|
||||
profile_sample=100.0,
|
||||
)
|
||||
|
||||
assert res_aborted == TestCaseResult(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user