2022-03-30 08:54:27 +02:00
|
|
|
# Copyright 2021 Collate
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
"""
|
|
|
|
Helper module to handle data sampling
|
|
|
|
for the profiler
|
|
|
|
"""
|
2023-07-12 17:02:32 +02:00
|
|
|
from typing import Union, cast
|
2022-03-30 08:54:27 +02:00
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
from sqlalchemy import Column, inspect, text
|
2023-07-13 13:35:37 +02:00
|
|
|
from sqlalchemy.orm import DeclarativeMeta, Query, aliased
|
2022-03-30 08:54:27 +02:00
|
|
|
from sqlalchemy.orm.util import AliasedClass
|
2023-03-28 11:52:12 +05:30
|
|
|
from sqlalchemy.sql.sqltypes import Enum
|
2022-03-30 08:54:27 +02:00
|
|
|
|
2023-01-31 15:57:51 +01:00
|
|
|
from metadata.generated.schema.entity.data.table import (
|
2023-03-01 08:20:38 +01:00
|
|
|
PartitionIntervalType,
|
2023-01-31 15:57:51 +01:00
|
|
|
PartitionProfilerConfig,
|
|
|
|
ProfileSampleType,
|
|
|
|
TableData,
|
|
|
|
)
|
2023-03-01 08:20:38 +01:00
|
|
|
from metadata.profiler.orm.functions.modulo import ModuloFn
|
|
|
|
from metadata.profiler.orm.functions.random_num import RandomNumFn
|
|
|
|
from metadata.profiler.orm.registry import Dialects
|
2023-04-04 17:16:44 +02:00
|
|
|
from metadata.profiler.processor.handle_partition import partition_filter_handler
|
2023-07-12 17:02:32 +02:00
|
|
|
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
|
2023-01-31 15:57:51 +01:00
|
|
|
from metadata.utils.sqa_utils import (
|
|
|
|
build_query_filter,
|
|
|
|
dispatch_to_date_or_datetime,
|
2023-03-01 08:20:38 +01:00
|
|
|
get_integer_range_filter,
|
2023-01-31 15:57:51 +01:00
|
|
|
get_partition_col_type,
|
2023-03-01 08:20:38 +01:00
|
|
|
get_value_filter,
|
2023-01-23 14:16:44 +01:00
|
|
|
)
|
2022-03-30 08:54:27 +02:00
|
|
|
|
2022-06-08 16:10:40 +02:00
|
|
|
RANDOM_LABEL = "random"
|
|
|
|
|
2022-03-30 08:54:27 +02:00
|
|
|
|
2023-03-28 11:52:12 +05:30
|
|
|
def _object_value_for_elem(self, elem):
|
|
|
|
"""
|
|
|
|
we have mapped DataType.ENUM: sqlalchemy.Enum
|
|
|
|
if map by default return None,
|
|
|
|
we will always get None because there is no enum map to lookup,
|
|
|
|
so what we are doing here is basically trusting the database,
|
|
|
|
that it will be storing the correct map key and showing directly that on the UI,
|
|
|
|
and in this approach we will be only able to display
|
|
|
|
what database has stored (i.e the key) and not the actual value of the same!
|
|
|
|
"""
|
|
|
|
return self._object_lookup.get(elem, elem) # pylint: disable=protected-access
|
|
|
|
|
|
|
|
|
|
|
|
Enum._object_value_for_elem = _object_value_for_elem # pylint: disable=protected-access
|
|
|
|
|
|
|
|
|
2023-07-12 17:02:32 +02:00
|
|
|
class SQASampler(SamplerInterface):
|
2022-03-30 08:54:27 +02:00
|
|
|
"""
|
|
|
|
Generates a sample of the data to not
|
|
|
|
run the query in the whole table.
|
|
|
|
"""
|
|
|
|
|
2022-06-14 21:37:44 +02:00
|
|
|
@partition_filter_handler(build_sample=True)
|
2022-06-08 16:10:40 +02:00
|
|
|
def get_sample_query(self) -> Query:
|
2023-04-14 15:59:26 +02:00
|
|
|
"""get query for sample data"""
|
2022-12-20 14:55:11 +05:30
|
|
|
if self.profile_sample_type == ProfileSampleType.PERCENTAGE:
|
2023-07-13 13:35:37 +02:00
|
|
|
rnd = (
|
2023-07-12 17:02:32 +02:00
|
|
|
self.client.query(
|
|
|
|
self.table,
|
2023-04-14 15:59:26 +02:00
|
|
|
(ModuloFn(RandomNumFn(), 100)).label(RANDOM_LABEL),
|
2022-12-20 14:55:11 +05:30
|
|
|
)
|
|
|
|
.suffix_with(
|
|
|
|
f"SAMPLE BERNOULLI ({self.profile_sample or 100})",
|
|
|
|
dialect=Dialects.Snowflake,
|
|
|
|
)
|
|
|
|
.cte(f"{self.table.__tablename__}_rnd")
|
2022-09-20 08:55:39 +02:00
|
|
|
)
|
2023-07-13 13:35:37 +02:00
|
|
|
session_query = self.client.query(rnd)
|
|
|
|
|
|
|
|
return session_query.where(rnd.c.random <= self.profile_sample).cte(
|
|
|
|
f"{self.table.__tablename__}_sample"
|
|
|
|
)
|
2023-07-12 17:02:32 +02:00
|
|
|
table_query = self.client.query(self.table)
|
2022-12-20 14:55:11 +05:30
|
|
|
return (
|
2023-07-12 17:02:32 +02:00
|
|
|
self.client.query(
|
|
|
|
self.table,
|
2023-01-16 22:17:46 +05:30
|
|
|
(ModuloFn(RandomNumFn(), table_query.count())).label(RANDOM_LABEL),
|
|
|
|
)
|
|
|
|
.order_by(RANDOM_LABEL)
|
2022-12-20 14:55:11 +05:30
|
|
|
.limit(self.profile_sample)
|
2022-09-20 08:55:39 +02:00
|
|
|
.cte(f"{self.table.__tablename__}_rnd")
|
|
|
|
)
|
2022-06-08 16:10:40 +02:00
|
|
|
|
2022-03-30 08:54:27 +02:00
|
|
|
def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]:
|
|
|
|
"""
|
|
|
|
Either return a sampled CTE of table, or
|
|
|
|
the full table if no sampling is required.
|
|
|
|
"""
|
2022-08-19 10:52:08 +02:00
|
|
|
if self._profile_sample_query:
|
2023-07-12 17:02:32 +02:00
|
|
|
return self._rdn_sample_from_user_query()
|
2022-06-24 14:46:34 +02:00
|
|
|
|
2022-06-28 19:27:55 +02:00
|
|
|
if not self.profile_sample:
|
|
|
|
if self._partition_details:
|
2023-01-31 15:57:51 +01:00
|
|
|
return self._partitioned_table()
|
2022-06-28 19:27:55 +02:00
|
|
|
|
|
|
|
return self.table
|
|
|
|
|
2022-03-30 08:54:27 +02:00
|
|
|
# Add new RandomNumFn column
|
2023-07-13 13:35:37 +02:00
|
|
|
sampled = self.get_sample_query()
|
2022-03-30 08:54:27 +02:00
|
|
|
|
|
|
|
# Assign as an alias
|
|
|
|
return aliased(self.table, sampled)
|
2022-06-08 16:10:40 +02:00
|
|
|
|
2023-07-12 17:02:32 +02:00
|
|
|
def fetch_sample_data(self) -> TableData:
|
2022-06-08 16:10:40 +02:00
|
|
|
"""
|
2022-11-15 20:31:10 +05:30
|
|
|
Use the sampler to retrieve sample data rows as per limit given by user
|
2022-06-08 16:10:40 +02:00
|
|
|
:return: TableData to be added to the Table Entity
|
|
|
|
"""
|
2022-06-24 14:46:34 +02:00
|
|
|
if self._profile_sample_query:
|
|
|
|
return self._fetch_sample_data_from_user_query()
|
2022-06-08 16:10:40 +02:00
|
|
|
|
|
|
|
# Add new RandomNumFn column
|
|
|
|
rnd = self.get_sample_query()
|
2023-07-12 17:02:32 +02:00
|
|
|
sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL]
|
2022-06-08 16:10:40 +02:00
|
|
|
|
|
|
|
sqa_sample = (
|
2023-07-12 17:02:32 +02:00
|
|
|
self.client.query(*sqa_columns)
|
2022-06-08 16:10:40 +02:00
|
|
|
.select_from(rnd)
|
|
|
|
.limit(self.sample_limit)
|
|
|
|
.all()
|
|
|
|
)
|
|
|
|
return TableData(
|
|
|
|
columns=[column.name for column in sqa_columns],
|
|
|
|
rows=[list(row) for row in sqa_sample],
|
|
|
|
)
|
2022-06-24 14:46:34 +02:00
|
|
|
|
|
|
|
def _fetch_sample_data_from_user_query(self) -> TableData:
|
|
|
|
"""Returns a table data object using results from query execution"""
|
2023-07-12 17:02:32 +02:00
|
|
|
rnd = self.client.execute(f"{self._profile_sample_query}")
|
2022-06-24 14:46:34 +02:00
|
|
|
try:
|
|
|
|
columns = [col.name for col in rnd.cursor.description]
|
|
|
|
except AttributeError:
|
|
|
|
columns = list(rnd.keys())
|
|
|
|
return TableData(
|
|
|
|
columns=columns,
|
|
|
|
rows=[list(row) for row in rnd.fetchmany(100)],
|
|
|
|
)
|
|
|
|
|
2023-07-12 17:02:32 +02:00
|
|
|
def _rdn_sample_from_user_query(self) -> Query:
|
2022-06-24 14:46:34 +02:00
|
|
|
"""Returns sql alchemy object to use when running profiling"""
|
2023-07-12 17:02:32 +02:00
|
|
|
return self.client.query(self.table).from_statement(
|
2022-06-24 14:46:34 +02:00
|
|
|
text(f"{self._profile_sample_query}")
|
|
|
|
)
|
2022-06-28 19:27:55 +02:00
|
|
|
|
2023-01-31 15:57:51 +01:00
|
|
|
def _partitioned_table(self) -> Query:
|
2022-06-28 19:27:55 +02:00
|
|
|
"""Return the Query object for partitioned tables"""
|
2023-01-31 15:57:51 +01:00
|
|
|
self._partition_details = cast(
|
|
|
|
PartitionProfilerConfig, self._partition_details
|
|
|
|
) # satisfying type checker
|
|
|
|
partition_field = self._partition_details.partitionColumnName
|
|
|
|
|
|
|
|
type_ = get_partition_col_type(
|
|
|
|
partition_field,
|
|
|
|
self.table.__table__.c,
|
|
|
|
)
|
|
|
|
|
2023-03-01 08:20:38 +01:00
|
|
|
if (
|
|
|
|
self._partition_details.partitionIntervalType
|
|
|
|
== PartitionIntervalType.COLUMN_VALUE
|
|
|
|
):
|
|
|
|
return aliased(
|
|
|
|
self.table,
|
|
|
|
(
|
2023-07-12 17:02:32 +02:00
|
|
|
self.client.query(self.table)
|
2023-03-01 08:20:38 +01:00
|
|
|
.filter(
|
|
|
|
get_value_filter(
|
|
|
|
Column(partition_field),
|
|
|
|
self._partition_details.partitionValues,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.subquery()
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
if (
|
|
|
|
self._partition_details.partitionIntervalType
|
|
|
|
== PartitionIntervalType.INTEGER_RANGE
|
|
|
|
):
|
|
|
|
return aliased(
|
|
|
|
self.table,
|
|
|
|
(
|
2023-07-12 17:02:32 +02:00
|
|
|
self.client.query(self.table)
|
2023-03-01 08:20:38 +01:00
|
|
|
.filter(
|
|
|
|
get_integer_range_filter(
|
|
|
|
Column(partition_field),
|
|
|
|
self._partition_details.partitionIntegerRangeStart,
|
|
|
|
self._partition_details.partitionIntegerRangeEnd,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.subquery()
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
return aliased(
|
|
|
|
self.table,
|
|
|
|
(
|
2023-07-12 17:02:32 +02:00
|
|
|
self.client.query(self.table)
|
2022-06-28 19:27:55 +02:00
|
|
|
.filter(
|
2023-01-31 15:57:51 +01:00
|
|
|
build_query_filter(
|
|
|
|
[
|
|
|
|
(
|
2023-03-01 08:20:38 +01:00
|
|
|
Column(partition_field),
|
2023-01-31 15:57:51 +01:00
|
|
|
"ge",
|
|
|
|
dispatch_to_date_or_datetime(
|
|
|
|
self._partition_details.partitionInterval,
|
|
|
|
text(
|
|
|
|
self._partition_details.partitionIntervalUnit.value
|
|
|
|
),
|
|
|
|
type_,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
],
|
|
|
|
False,
|
2023-01-23 14:16:44 +01:00
|
|
|
)
|
2022-06-28 19:27:55 +02:00
|
|
|
)
|
|
|
|
.subquery()
|
2023-03-01 08:20:38 +01:00
|
|
|
),
|
2022-06-28 19:27:55 +02:00
|
|
|
)
|