103 lines
3.4 KiB
Python
Raw Normal View History

# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to handle data sampling
for the profiler
"""
import math
import random
from typing import Any, Optional
from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.utils.constants import CHUNKSIZE
RANDOM_LABEL = "random"
class DatalakeSampler:
"""
Generates a sample of the data to not
run the query in the whole table.
"""
def __init__(
self,
session: Optional[Any],
table,
profile_sample_config: Optional[ProfileSampleConfig] = None,
profile_sample_query: Optional[str] = None,
):
self.profile_sample = None
self.profile_sample_type = None
if profile_sample_config:
self.profile_sample = profile_sample_config.profile_sample
self.profile_sample_type = profile_sample_config.profile_sample_type
self.session = session
self.table = table
self._profile_sample_query = profile_sample_query
self.sample_limit = 100
self._sample_rows = None
def _fetch_rows(self, data_frame):
from pandas import notnull # pylint: disable=import-outside-toplevel
sampled_data_frame = data_frame.sample(
n=(int(self.profile_sample) or 100)
if self.profile_sample_type == ProfileSampleType.ROWS
else None,
frac=self.profile_sample
if self.profile_sample_type == ProfileSampleType.PERCENTAGE
else None,
random_state=random.randint(0, 100),
replace=True,
)
return (
sampled_data_frame.astype(object)
.where(
notnull(sampled_data_frame),
None,
)
.values.tolist()
)
def get_col_row(self, data_frame):
"""
Fetches columns and rows from the data_frame
"""
from pandas import DataFrame # pylint: disable=import-outside-toplevel
cols = []
chunk = None
if isinstance(data_frame, DataFrame):
return (
Fixes #9301 - Refactor TestSuite and Remove Pandas from Base Requirements (#10244) * feat(testSuite): extracted out column test for SQA type * refactor(testSuite): extracted SQA column and table tests into their own classes * refactor(testSuite): Added pkutil namespace package style for test suite classes * refactor(testSuite): added dynamic importer function for test cases * refactor(testSuite): black formatting * refactor(testSuite): fixed linting issues * refactor(testSuite): refactor metrics for dataframe * refactor(testSuite): Added Mixins and base methods * refactor(testSuite): extrcated out get bound for floats * refactor(testSuite): Added pandas column test cases * refactor(testSuite): Deleted old column tests * refactor(testSuite): Added table tests for datalake * refactor(testSuite): Removed old tests definition * refactor(testSuite): changed registry to dynamic class inport * refactor(testSuite): renamed dl_fn to df_fn * refactor(testSuite): updated registry unit test * refactor(testSuite): updated import path to sqa like column * refactor(testSuite): cleaned up imports in old files * refactor(testSuite): harmonzied SQALikeColumn object to replicate SQA Column object * refactor(testSuite): linting * refactor(testSuite): linting * refactor(testSuite): raise expection on DQ exception * refactor(testSuite): linting * refactor(testSuite): removed pandas from base requirements * refactor(testSuite): Added __futur__ for py3.7 type hint * refactor(testSuite): added `df` to good-names * refactor(testSuite): renamed Handler to Validator * refactor(testSuite): Added test inheritance for column tests * refactor(testSuite): cleaned up column type check * refactor(testSuite): cleaned up typo * refactor(testSuite): extracted main table test logic into parent class * refactor(testSuite): linting * refactor(testSuite): linting fixes * refactor(testSuite): address doc string and linting issues
2023-02-22 09:42:34 +01:00
data_frame.columns.tolist(),
self._fetch_rows(data_frame),
)
chunk_limit = math.ceil(self.profile_sample / CHUNKSIZE)
Fixes #9301 - Refactor TestSuite and Remove Pandas from Base Requirements (#10244) * feat(testSuite): extracted out column test for SQA type * refactor(testSuite): extracted SQA column and table tests into their own classes * refactor(testSuite): Added pkutil namespace package style for test suite classes * refactor(testSuite): added dynamic importer function for test cases * refactor(testSuite): black formatting * refactor(testSuite): fixed linting issues * refactor(testSuite): refactor metrics for dataframe * refactor(testSuite): Added Mixins and base methods * refactor(testSuite): extrcated out get bound for floats * refactor(testSuite): Added pandas column test cases * refactor(testSuite): Deleted old column tests * refactor(testSuite): Added table tests for datalake * refactor(testSuite): Removed old tests definition * refactor(testSuite): changed registry to dynamic class inport * refactor(testSuite): renamed dl_fn to df_fn * refactor(testSuite): updated registry unit test * refactor(testSuite): updated import path to sqa like column * refactor(testSuite): cleaned up imports in old files * refactor(testSuite): harmonzied SQALikeColumn object to replicate SQA Column object * refactor(testSuite): linting * refactor(testSuite): linting * refactor(testSuite): raise expection on DQ exception * refactor(testSuite): linting * refactor(testSuite): removed pandas from base requirements * refactor(testSuite): Added __futur__ for py3.7 type hint * refactor(testSuite): added `df` to good-names * refactor(testSuite): renamed Handler to Validator * refactor(testSuite): Added test inheritance for column tests * refactor(testSuite): cleaned up column type check * refactor(testSuite): cleaned up typo * refactor(testSuite): extracted main table test logic into parent class * refactor(testSuite): linting * refactor(testSuite): linting fixes * refactor(testSuite): address doc string and linting issues
2023-02-22 09:42:34 +01:00
cols = data_frame[0].columns.tolist()
rows = []
for index, chunk in enumerate(data_frame):
if index >= chunk_limit:
break
rows.extend(self._fetch_rows(chunk))
return cols, rows
def fetch_dl_sample_data(self) -> TableData:
from pandas import DataFrame # pylint: disable=import-outside-toplevel
cols, rows = self.get_col_row(
data_frame=self.table[0]
if not isinstance(self.table, DataFrame)
else self.table
)
return TableData(columns=cols, rows=rows)