mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-05 03:54:23 +00:00
Fix 15576 - Eval Data Type issue fix (#15702)
This commit is contained in:
parent
f28214e0c0
commit
b79e5c064b
@ -22,7 +22,11 @@ from typing import Dict, List, Optional
|
||||
|
||||
from sqlalchemy import Column
|
||||
|
||||
from metadata.generated.schema.entity.data.table import CustomMetricProfile, TableData
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
CustomMetricProfile,
|
||||
DataType,
|
||||
TableData,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
||||
DatalakeConnection,
|
||||
)
|
||||
@ -32,12 +36,8 @@ from metadata.profiler.api.models import ThreadPoolMetrics
|
||||
from metadata.profiler.interface.profiler_interface import ProfilerInterface
|
||||
from metadata.profiler.metrics.core import MetricTypes
|
||||
from metadata.profiler.metrics.registry import Metrics
|
||||
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
|
||||
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, SAMPLE_DATA_DEFAULT_COUNT
|
||||
from metadata.utils.datalake.datalake_utils import (
|
||||
GenericDataFrameColumnParser,
|
||||
fetch_dataframe,
|
||||
)
|
||||
from metadata.utils.datalake.datalake_utils import GenericDataFrameColumnParser
|
||||
from metadata.utils.logger import profiler_interface_registry_logger
|
||||
from metadata.utils.sqa_like_column import SQALikeColumn
|
||||
|
||||
@ -85,29 +85,48 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
|
||||
)
|
||||
|
||||
self.client = self.connection.client
|
||||
self.dfs = self._convert_table_to_list_of_dataframe_objects()
|
||||
self.sampler = self._get_sampler()
|
||||
self.complex_dataframe_sample = deepcopy(self.sampler.random_sample())
|
||||
|
||||
def _convert_table_to_list_of_dataframe_objects(self):
|
||||
"""From a table entity, return the corresponding dataframe object
|
||||
|
||||
Returns:
|
||||
List[DataFrame]
|
||||
"""
|
||||
data = fetch_dataframe(
|
||||
config_source=self.service_connection_config.configSource,
|
||||
self.dfs = self.return_ometa_dataframes_sampled(
|
||||
service_connection_config=self.service_connection_config,
|
||||
client=self.client,
|
||||
file_fqn=DatalakeTableSchemaWrapper(
|
||||
key=self.table_entity.name.__root__,
|
||||
bucket_name=self.table_entity.databaseSchema.name,
|
||||
file_extension=self.table_entity.fileFormat,
|
||||
),
|
||||
table=self.table_entity,
|
||||
profile_sample_config=profile_sample_config,
|
||||
)
|
||||
self.sampler = self._get_sampler()
|
||||
self.complex_dataframe_sample = deepcopy(
|
||||
self.sampler.random_sample(is_sampled=True)
|
||||
)
|
||||
self.complex_df()
|
||||
|
||||
if not data:
|
||||
raise TypeError(f"Couldn't fetch {self.table_entity.name.__root__}")
|
||||
return data
|
||||
def complex_df(self):
|
||||
"""Assign DataTypes to dataframe columns as per the parsed column type"""
|
||||
coltype_mapping_df = []
|
||||
data_formats = (
|
||||
GenericDataFrameColumnParser._data_formats # pylint: disable=protected-access
|
||||
)
|
||||
for index, df in enumerate(self.complex_dataframe_sample):
|
||||
if index == 0:
|
||||
for col in self.table.columns:
|
||||
coltype = next(
|
||||
(
|
||||
key
|
||||
for key, value in data_formats.items()
|
||||
if col.dataType == value
|
||||
),
|
||||
None,
|
||||
)
|
||||
if coltype and col.dataType not in {DataType.JSON, DataType.ARRAY}:
|
||||
coltype_mapping_df.append(coltype)
|
||||
else:
|
||||
coltype_mapping_df.append("object")
|
||||
|
||||
try:
|
||||
self.complex_dataframe_sample[index] = df.astype(
|
||||
dict(zip(df.keys(), coltype_mapping_df))
|
||||
)
|
||||
except (TypeError, ValueError) as err:
|
||||
self.complex_dataframe_sample[index] = df
|
||||
logger.warning(f"NaN/NoneType found in the Dataframe: {err}")
|
||||
break
|
||||
|
||||
def _get_sampler(self):
|
||||
"""Get dataframe sampler from config"""
|
||||
@ -171,19 +190,20 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
|
||||
"""
|
||||
import pandas as pd # pylint: disable=import-outside-toplevel
|
||||
|
||||
row_dict = {}
|
||||
try:
|
||||
row_dict = {}
|
||||
|
||||
for metric in metrics:
|
||||
metric_resp = metric(column).df_fn(runner)
|
||||
row_dict[metric.name()] = (
|
||||
None if pd.isnull(metric_resp) else metric_resp
|
||||
)
|
||||
return row_dict
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
f"{traceback.format_exc()}\nError trying to compute profile for {exc}"
|
||||
)
|
||||
raise RuntimeError(exc)
|
||||
return row_dict
|
||||
|
||||
def _compute_query_metrics(
|
||||
self,
|
||||
@ -283,7 +303,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
|
||||
metric_func: ThreadPoolMetrics,
|
||||
):
|
||||
"""Run metrics in processor worker"""
|
||||
logger.debug(f"Running profiler for {metric_func.table}")
|
||||
logger.debug(f"Running profiler for {metric_func.table.name.__root__}")
|
||||
try:
|
||||
row = None
|
||||
if self.complex_dataframe_sample:
|
||||
|
||||
@ -48,10 +48,12 @@ class NullRatio(ComposedMetric):
|
||||
Safely compute null ratio based on the profiler
|
||||
results of other Metrics
|
||||
"""
|
||||
import pandas as pd
|
||||
|
||||
res_count = res.get(Count.name())
|
||||
res_null = res.get(NullCount.name())
|
||||
|
||||
if res_count is not None and res_null is not None:
|
||||
return res_null / (res_null + res_count)
|
||||
|
||||
if not pd.isnull(res_count) and not pd.isnull(res_null):
|
||||
result = res_null / (res_null + res_count)
|
||||
return None if pd.isnull(result) else result
|
||||
return None
|
||||
|
||||
@ -14,6 +14,7 @@ Distinct Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
import json
|
||||
|
||||
from sqlalchemy import column, distinct, func
|
||||
|
||||
@ -57,7 +58,14 @@ class DistinctCount(StaticMetric):
|
||||
counter = Counter()
|
||||
for df in dfs:
|
||||
df_col_value = df[self.col.name].dropna().to_list()
|
||||
counter.update(df_col_value)
|
||||
try:
|
||||
counter.update(df_col_value)
|
||||
except TypeError as err:
|
||||
if isinstance(df_col_value, list):
|
||||
for value in df_col_value:
|
||||
counter.update([json.dumps(value)])
|
||||
else:
|
||||
raise err
|
||||
return len(counter.keys())
|
||||
except Exception as err:
|
||||
logger.debug(
|
||||
|
||||
@ -19,7 +19,7 @@ from sqlalchemy import TIME, column
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import GenericFunction
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.table import DataType, Table
|
||||
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
|
||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, T, _label
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
@ -94,12 +94,20 @@ class Max(StaticMetric):
|
||||
|
||||
def df_fn(self, dfs=None):
|
||||
"""pandas function"""
|
||||
import pandas as pd
|
||||
|
||||
if is_quantifiable(self.col.type):
|
||||
return max((df[self.col.name].max() for df in dfs))
|
||||
|
||||
if is_date_time(self.col.type):
|
||||
max_ = max((df[self.col.name].max() for df in dfs))
|
||||
return int(max_.timestamp() * 1000)
|
||||
return 0
|
||||
max_ = None
|
||||
if self.col.type in {DataType.DATETIME, DataType.DATE}:
|
||||
max_ = max((pd.to_datetime(df[self.col.name]).max() for df in dfs))
|
||||
return None if pd.isnull(max_) else int(max_.timestamp() * 1000)
|
||||
elif self.col.type == DataType.TIME:
|
||||
max_ = max((pd.to_timedelta(df[self.col.name]).max() for df in dfs))
|
||||
return None if pd.isnull(max_) else max_.seconds
|
||||
return None
|
||||
|
||||
def nosql_fn(self, adaptor: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
|
||||
"""nosql function"""
|
||||
|
||||
@ -66,7 +66,7 @@ class MaxLength(StaticMetric):
|
||||
max_length_list = []
|
||||
|
||||
for df in dfs:
|
||||
if any(df[self.col.name]):
|
||||
if any(df[self.col.name].dropna()):
|
||||
max_length_list.append(
|
||||
length_vectorize_func(
|
||||
df[self.col.name].dropna().astype(str)
|
||||
|
||||
@ -118,25 +118,23 @@ class Mean(StaticMetric):
|
||||
|
||||
means = []
|
||||
weights = []
|
||||
|
||||
if is_quantifiable(self.col.type):
|
||||
for df in dfs:
|
||||
mean = df[self.col.name].mean()
|
||||
if not pd.isnull(mean):
|
||||
means.append(mean)
|
||||
weights.append(df[self.col.name].count())
|
||||
|
||||
if is_concatenable(self.col.type):
|
||||
length_vectorize_func = vectorize(len)
|
||||
for df in dfs:
|
||||
length_vectorize_func = vectorize(len)
|
||||
for df in dfs:
|
||||
processed_df = df[self.col.name].dropna()
|
||||
try:
|
||||
mean = None
|
||||
if any(df[self.col.name]):
|
||||
mean = length_vectorize_func(
|
||||
df[self.col.name].dropna().astype(str)
|
||||
).mean()
|
||||
if is_quantifiable(self.col.type):
|
||||
mean = processed_df.mean()
|
||||
if is_concatenable(self.col.type):
|
||||
mean = length_vectorize_func(processed_df.astype(str)).mean()
|
||||
if not pd.isnull(mean):
|
||||
means.append(mean)
|
||||
weights.append(df[self.col.name].dropna().count())
|
||||
weights.append(processed_df.count())
|
||||
except Exception as err:
|
||||
logger.debug(
|
||||
f"Error while computing mean for column {self.col.name}: {err}"
|
||||
)
|
||||
return None
|
||||
|
||||
if means:
|
||||
return average(means, weights=weights)
|
||||
|
||||
@ -19,7 +19,7 @@ from sqlalchemy import TIME, column
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import GenericFunction
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.table import DataType, Table
|
||||
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
|
||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, T, _label
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
@ -95,12 +95,19 @@ class Min(StaticMetric):
|
||||
|
||||
def df_fn(self, dfs=None):
|
||||
"""pandas function"""
|
||||
import pandas as pd
|
||||
|
||||
if is_quantifiable(self.col.type):
|
||||
return min((df[self.col.name].min() for df in dfs))
|
||||
if is_date_time(self.col.type):
|
||||
min_ = min((df[self.col.name].min() for df in dfs))
|
||||
return int(min_.timestamp() * 1000)
|
||||
return 0
|
||||
min_ = None
|
||||
if self.col.type in {DataType.DATETIME, DataType.DATE}:
|
||||
min_ = min((pd.to_datetime(df[self.col.name]).min() for df in dfs))
|
||||
return None if pd.isnull(min_) else int(min_.timestamp() * 1000)
|
||||
elif self.col.type == DataType.TIME:
|
||||
min_ = min((pd.to_timedelta(df[self.col.name]).min() for df in dfs))
|
||||
return None if pd.isnull(min_) else min_.seconds
|
||||
return None
|
||||
|
||||
def nosql_fn(self, adaptor: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
|
||||
"""nosql function"""
|
||||
|
||||
@ -67,7 +67,7 @@ class MinLength(StaticMetric):
|
||||
min_length_list = []
|
||||
|
||||
for df in dfs:
|
||||
if any(df[self.col.name]):
|
||||
if any(df[self.col.name].dropna()):
|
||||
min_length_list.append(
|
||||
length_vectorize_func(
|
||||
df[self.col.name].dropna().astype(str)
|
||||
|
||||
@ -55,7 +55,13 @@ class Sum(StaticMetric):
|
||||
"""pandas function"""
|
||||
|
||||
if is_quantifiable(self.col.type):
|
||||
return sum(df[self.col.name].sum() for df in dfs)
|
||||
try:
|
||||
return sum(df[self.col.name].sum() for df in dfs)
|
||||
except (TypeError, ValueError):
|
||||
try:
|
||||
return sum(df[self.col.name].astype(float).sum() for df in dfs)
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
def nosql_fn(self, adaptor: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
Unique Count Metric definition
|
||||
"""
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import column, func
|
||||
@ -72,7 +73,14 @@ class UniqueCount(QueryMetric):
|
||||
counter = Counter()
|
||||
for df in dfs:
|
||||
df_col_value = df[self.col.name].dropna().to_list()
|
||||
counter.update(df_col_value)
|
||||
try:
|
||||
counter.update(df_col_value)
|
||||
except TypeError as err:
|
||||
if isinstance(df_col_value, list):
|
||||
for value in df_col_value:
|
||||
counter.update([json.dumps(value)])
|
||||
else:
|
||||
raise err
|
||||
return len([key for key, value in counter.items() if value == 1])
|
||||
except Exception as err:
|
||||
logger.debug(
|
||||
|
||||
@ -89,7 +89,12 @@ class Median(StaticMetric, PercentilMixin):
|
||||
f"We recommend using a smaller sample size or partitionning."
|
||||
)
|
||||
return None
|
||||
median = df.median()
|
||||
try:
|
||||
median = df.median()
|
||||
except Exception as err:
|
||||
logger.error(
|
||||
f"Unable to compute Median for {self.col.name} due to error: {err}"
|
||||
)
|
||||
return None if pd.isnull(median) else median
|
||||
logger.debug(
|
||||
f"Don't know how to process type {self.col.type} when computing Median"
|
||||
|
||||
@ -115,8 +115,6 @@ QUANTIFIABLE_SET = {
|
||||
|
||||
CONCATENABLE_SET = {DataType.STRING.value, DataType.TEXT.value}
|
||||
|
||||
DATATIME_SET = {DataType.DATETIME.value}
|
||||
|
||||
|
||||
# Now, let's define some helper methods to identify
|
||||
# the nature of an SQLAlchemy type
|
||||
@ -139,7 +137,11 @@ def is_date_time(_type) -> bool:
|
||||
Check if sqlalchemy _type is derived from Date, Time or DateTime Type
|
||||
"""
|
||||
if isinstance(_type, DataType):
|
||||
return _type.value in DATATIME_SET
|
||||
return _type.value in {
|
||||
DataType.DATETIME.value,
|
||||
DataType.TIME.value,
|
||||
DataType.DATE.value,
|
||||
}
|
||||
return (
|
||||
issubclass(_type.__class__, Date)
|
||||
or issubclass(_type.__class__, Time)
|
||||
|
||||
@ -66,7 +66,7 @@ class MissingMetricException(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class Profiler(Generic[TMetric]):
|
||||
class Profiler(Generic[TMetric]): # pylint: disable=too-many-public-methods
|
||||
"""
|
||||
Core Profiler.
|
||||
|
||||
@ -108,6 +108,7 @@ class Profiler(Generic[TMetric]):
|
||||
|
||||
# We will get columns from the property
|
||||
self._columns: Optional[List[Column]] = None
|
||||
self.fetch_column_from_property()
|
||||
self.data_frame_list = None
|
||||
|
||||
@property
|
||||
@ -161,7 +162,14 @@ class Profiler(Generic[TMetric]):
|
||||
if column.name not in self._get_excluded_columns()
|
||||
]
|
||||
|
||||
return self._columns
|
||||
return [
|
||||
column
|
||||
for column in self._columns
|
||||
if column.type.__class__.__name__ not in NOT_COMPUTE
|
||||
]
|
||||
|
||||
def fetch_column_from_property(self) -> Optional[List[Column]]:
|
||||
self._columns = self.columns
|
||||
|
||||
def _get_excluded_columns(self) -> Optional[Set[str]]:
|
||||
"""Get excluded columns for table being profiled"""
|
||||
@ -434,11 +442,6 @@ class Profiler(Generic[TMetric]):
|
||||
|
||||
def _prepare_column_metrics(self) -> List:
|
||||
"""prepare column metrics"""
|
||||
columns = [
|
||||
column
|
||||
for column in self.columns
|
||||
if column.type.__class__.__name__ not in NOT_COMPUTE
|
||||
]
|
||||
column_metrics_for_thread_pool = []
|
||||
static_metrics = [
|
||||
ThreadPoolMetrics(
|
||||
@ -451,7 +454,7 @@ class Profiler(Generic[TMetric]):
|
||||
column=column,
|
||||
table=self.table,
|
||||
)
|
||||
for column in columns
|
||||
for column in self.columns
|
||||
]
|
||||
query_metrics = [
|
||||
ThreadPoolMetrics(
|
||||
@ -460,7 +463,7 @@ class Profiler(Generic[TMetric]):
|
||||
column=column,
|
||||
table=self.table,
|
||||
)
|
||||
for column in columns
|
||||
for column in self.columns
|
||||
for metric in self.get_col_metrics(self.query_metrics, column)
|
||||
]
|
||||
window_metrics = [
|
||||
@ -474,7 +477,7 @@ class Profiler(Generic[TMetric]):
|
||||
column=column,
|
||||
table=self.table,
|
||||
)
|
||||
for column in columns
|
||||
for column in self.columns
|
||||
]
|
||||
|
||||
# we'll add the system metrics to the thread pool computation
|
||||
@ -482,7 +485,7 @@ class Profiler(Generic[TMetric]):
|
||||
column_metrics_for_thread_pool.extend(metric_type)
|
||||
|
||||
# we'll add the custom metrics to the thread pool computation
|
||||
for column in columns:
|
||||
for column in self.columns:
|
||||
custom_metrics = self.get_custom_metrics(column.name)
|
||||
if custom_metrics:
|
||||
column_metrics_for_thread_pool.append(
|
||||
@ -631,12 +634,16 @@ class Profiler(Generic[TMetric]):
|
||||
rowCount=self._table_results.get(RowCount.name()),
|
||||
createDateTime=self._table_results.get("createDateTime"),
|
||||
sizeInByte=self._table_results.get("sizeInBytes"),
|
||||
profileSample=self.profile_sample_config.profile_sample
|
||||
if self.profile_sample_config
|
||||
else None,
|
||||
profileSampleType=self.profile_sample_config.profile_sample_type
|
||||
if self.profile_sample_config
|
||||
else None,
|
||||
profileSample=(
|
||||
self.profile_sample_config.profile_sample
|
||||
if self.profile_sample_config
|
||||
else None
|
||||
),
|
||||
profileSampleType=(
|
||||
self.profile_sample_config.profile_sample_type
|
||||
if self.profile_sample_config
|
||||
else None
|
||||
),
|
||||
customMetrics=self._table_results.get("customMetrics"),
|
||||
)
|
||||
|
||||
|
||||
@ -136,7 +136,7 @@ class DatalakeSampler(SamplerInterface):
|
||||
break
|
||||
return cols, rows
|
||||
|
||||
def random_sample(self):
|
||||
def random_sample(self, is_sampled: bool = False):
|
||||
"""Generate random sample from the table
|
||||
|
||||
Returns:
|
||||
@ -148,9 +148,8 @@ class DatalakeSampler(SamplerInterface):
|
||||
if self._partition_details:
|
||||
self.table = self._partitioned_table()
|
||||
|
||||
if not self.profile_sample:
|
||||
if not self.profile_sample or is_sampled:
|
||||
return self.table
|
||||
|
||||
return self._get_sampled_dataframe()
|
||||
|
||||
def _fetch_rows(self, data_frame):
|
||||
|
||||
@ -33,7 +33,7 @@ def _get_json_text(key: str, text: bytes, decode: bool) -> Union[str, bytes]:
|
||||
processed_text = gzip.decompress(text)
|
||||
if key.endswith(".zip"):
|
||||
with zipfile.ZipFile(io.BytesIO(text)) as zip_file:
|
||||
processed_text = zip_file.read(zip_file.infolist()[0]).decode(UTF_8)
|
||||
processed_text = zip_file.read(zip_file.infolist()[0])
|
||||
if decode:
|
||||
return processed_text.decode(UTF_8) if isinstance(text, bytes) else text
|
||||
return processed_text
|
||||
|
||||
@ -179,8 +179,10 @@ class GenericDataFrameColumnParser:
|
||||
**dict.fromkeys(["float64", "float32", "float"], DataType.FLOAT),
|
||||
"bool": DataType.BOOLEAN,
|
||||
**dict.fromkeys(
|
||||
["datetime64", "timedelta[ns]", "datetime64[ns]"], DataType.DATETIME
|
||||
["datetime64[ns]", "datetime"],
|
||||
DataType.DATETIME,
|
||||
),
|
||||
"timedelta[ns]": DataType.TIME,
|
||||
"str": DataType.STRING,
|
||||
"bytes": DataType.BYTES,
|
||||
}
|
||||
@ -253,10 +255,44 @@ class GenericDataFrameColumnParser:
|
||||
):
|
||||
try:
|
||||
# Safely evaluate the input string
|
||||
df_row_val = data_frame[column_name].dropna().values[0]
|
||||
parsed_object = ast.literal_eval(str(df_row_val))
|
||||
df_row_val_list = data_frame[column_name].dropna().values[:1000]
|
||||
parsed_object_datatype_list = []
|
||||
for df_row_val in df_row_val_list:
|
||||
try:
|
||||
parsed_object_datatype_list.append(
|
||||
type(ast.literal_eval(str(df_row_val))).__name__.lower()
|
||||
)
|
||||
except (ValueError, SyntaxError):
|
||||
# we try to parse the value as a datetime, if it fails, we fallback to string
|
||||
# as literal_eval will fail for string
|
||||
from datetime import datetime
|
||||
|
||||
from dateutil.parser import ParserError, parse
|
||||
|
||||
try:
|
||||
dtype_ = "int64"
|
||||
if not str(df_row_val).isnumeric():
|
||||
# check if the row value is time
|
||||
try:
|
||||
datetime.strptime(df_row_val, "%H:%M:%S").time()
|
||||
dtype_ = "timedelta[ns]"
|
||||
except (ValueError, TypeError):
|
||||
# check if the row value is date / time / datetime
|
||||
type(parse(df_row_val)).__name__.lower()
|
||||
dtype_ = "datetime64[ns]"
|
||||
parsed_object_datatype_list.append(dtype_)
|
||||
except (ParserError, TypeError):
|
||||
parsed_object_datatype_list.append("str")
|
||||
except Exception as err:
|
||||
logger.debug(
|
||||
f"Failed to parse datatype for column {column_name}, exc: {err},"
|
||||
"Falling back to string."
|
||||
)
|
||||
parsed_object_datatype_list.append("str")
|
||||
|
||||
data_type = max(parsed_object_datatype_list)
|
||||
# Determine the data type of the parsed object
|
||||
data_type = type(parsed_object).__name__.lower()
|
||||
|
||||
except (ValueError, SyntaxError):
|
||||
# Handle any exceptions that may occur
|
||||
data_type = "string"
|
||||
|
||||
@ -33,6 +33,7 @@ from metadata.generated.schema.entity.services.connections.database.datalakeConn
|
||||
)
|
||||
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
|
||||
from metadata.generated.schema.tests.customMetric import CustomMetric
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.profiler.interface.pandas.profiler_interface import (
|
||||
PandasProfilerInterface,
|
||||
)
|
||||
@ -67,11 +68,36 @@ class MetricsTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(id=uuid4(), type="databaseSchema", name="name"),
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
dataType=DataType.INT,
|
||||
)
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="first_name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="last_name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="city"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="country"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="birthdate"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="age"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
@ -115,9 +141,8 @@ class MetricsTest(TestCase):
|
||||
self.s3_keys.append(key)
|
||||
self.client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key)
|
||||
|
||||
with patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
with patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=self.dfs,
|
||||
):
|
||||
self.sqa_profiler_interface = PandasProfilerInterface(
|
||||
@ -136,11 +161,38 @@ class MetricsTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(
|
||||
id=uuid4(), type="databaseSchema", name="name"
|
||||
),
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
dataType=DataType.INT,
|
||||
)
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="first_name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="last_name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="city"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="country"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="birthdate"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="age"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
],
|
||||
customMetrics=[
|
||||
CustomMetric(
|
||||
@ -153,9 +205,8 @@ class MetricsTest(TestCase):
|
||||
),
|
||||
],
|
||||
)
|
||||
with patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
with patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=self.dfs,
|
||||
):
|
||||
self.sqa_profiler_interface = PandasProfilerInterface(
|
||||
@ -185,6 +236,9 @@ class MetricsTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(
|
||||
id=uuid4(), type="databaseSchema", name="name"
|
||||
),
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
@ -204,9 +258,8 @@ class MetricsTest(TestCase):
|
||||
)
|
||||
],
|
||||
)
|
||||
with patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
with patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=self.dfs,
|
||||
):
|
||||
self.sqa_profiler_interface = PandasProfilerInterface(
|
||||
|
||||
@ -21,6 +21,10 @@ from sqlalchemy.orm import declarative_base
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column as EntityColumn
|
||||
from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table
|
||||
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
||||
DatalakeConnection,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.profiler.interface.pandas.profiler_interface import (
|
||||
PandasProfilerInterface,
|
||||
)
|
||||
@ -82,9 +86,8 @@ class DatalakeMetricsTest(TestCase):
|
||||
"metadata.profiler.interface.profiler_interface.get_connection",
|
||||
return_value=FakeConnection,
|
||||
)
|
||||
@mock.patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
@mock.patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=[df1, pd.concat([df2, pd.DataFrame(index=df1.index)])],
|
||||
)
|
||||
def setUpClass(cls, mock_get_connection, mocked_dfs):
|
||||
@ -95,17 +98,57 @@ class DatalakeMetricsTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(
|
||||
id=uuid4(), type="databaseSchema", name="name"
|
||||
),
|
||||
fileFormat="csv",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
name=ColumnName(__root__="name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="fullname"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="nickname"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="comments"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="age"),
|
||||
dataType=DataType.INT,
|
||||
)
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="dob"),
|
||||
dataType=DataType.DATETIME,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="tob"),
|
||||
dataType=DataType.TIME,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="doe"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="json"),
|
||||
dataType=DataType.JSON,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="array"),
|
||||
dataType=DataType.ARRAY,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
cls.datalake_profiler_interface = PandasProfilerInterface(
|
||||
entity=table_entity,
|
||||
service_connection_config=None,
|
||||
service_connection_config=DatalakeConnection(configSource={}),
|
||||
storage_config=None,
|
||||
ometa_client=None,
|
||||
thread_count=None,
|
||||
@ -167,9 +210,9 @@ class DatalakeMetricsTest(TestCase):
|
||||
)
|
||||
res = profiler.compute_metrics()._column_results
|
||||
# string as min returns 0
|
||||
assert res.get(User.dob.name).get(Metrics.MIN.name) == 0
|
||||
assert res.get(User.tob.name).get(Metrics.MIN.name) == 0
|
||||
assert res.get(User.doe.name).get(Metrics.MIN.name) == 0
|
||||
assert res.get(User.dob.name).get(Metrics.MIN.name) == 642902400000
|
||||
assert res.get(User.tob.name).get(Metrics.MIN.name) == 36091
|
||||
assert res.get(User.doe.name).get(Metrics.MIN.name) == 1257897600000
|
||||
|
||||
def test_null_count(self):
|
||||
"""
|
||||
|
||||
@ -36,6 +36,10 @@ from metadata.generated.schema.entity.data.table import (
|
||||
TableProfile,
|
||||
TableProfilerConfig,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
||||
DatalakeConnection,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.source import sqa_types
|
||||
from metadata.profiler.interface.pandas.profiler_interface import (
|
||||
PandasProfilerInterface,
|
||||
@ -93,11 +97,49 @@ class ProfilerTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(id=uuid4(), type="databaseSchema", name="name"),
|
||||
fileFormat="csv",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
name=ColumnName(__root__="name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="fullname"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="nickname"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="comments"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="age"),
|
||||
dataType=DataType.INT,
|
||||
)
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="dob"),
|
||||
dataType=DataType.DATETIME,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="tob"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="doe"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="json"),
|
||||
dataType=DataType.JSON,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="array"),
|
||||
dataType=DataType.ARRAY,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
@ -106,15 +148,14 @@ class ProfilerTest(TestCase):
|
||||
"metadata.profiler.interface.profiler_interface.get_connection",
|
||||
return_value=FakeConnection,
|
||||
)
|
||||
@mock.patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
@mock.patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=[df1, pd.concat([df2, pd.DataFrame(index=df1.index)])],
|
||||
)
|
||||
def setUpClass(cls, mock_get_connection, mocked_dfs):
|
||||
cls.datalake_profiler_interface = PandasProfilerInterface(
|
||||
entity=cls.table_entity,
|
||||
service_connection_config=None,
|
||||
service_connection_config=DatalakeConnection(configSource={}),
|
||||
storage_config=None,
|
||||
ometa_client=None,
|
||||
thread_count=None,
|
||||
@ -266,7 +307,6 @@ class ProfilerTest(TestCase):
|
||||
default_profiler = DefaultProfiler(
|
||||
profiler_interface=self.datalake_profiler_interface,
|
||||
)
|
||||
|
||||
column_metrics = default_profiler._prepare_column_metrics()
|
||||
for metric in column_metrics:
|
||||
if (
|
||||
|
||||
@ -32,6 +32,10 @@ from metadata.generated.schema.entity.data.table import (
|
||||
Table,
|
||||
TableProfile,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
||||
DatalakeConnection,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.profiler.api.models import ThreadPoolMetrics
|
||||
from metadata.profiler.interface.pandas.profiler_interface import (
|
||||
PandasProfilerInterface,
|
||||
@ -88,11 +92,49 @@ class PandasInterfaceTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(id=uuid4(), type="databaseSchema", name="name"),
|
||||
fileFormat="csv",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
name=ColumnName(__root__="name"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="fullname"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="nickname"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="comments"),
|
||||
dataType=DataType.STRING,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="age"),
|
||||
dataType=DataType.INT,
|
||||
)
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="dob"),
|
||||
dataType=DataType.DATETIME,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="tob"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="doe"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="json"),
|
||||
dataType=DataType.JSON,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="array"),
|
||||
dataType=DataType.ARRAY,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
@ -101,15 +143,14 @@ class PandasInterfaceTest(TestCase):
|
||||
"metadata.profiler.interface.profiler_interface.get_connection",
|
||||
return_value=FakeConnection,
|
||||
)
|
||||
@mock.patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
@mock.patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=[df1, pd.concat([df2, pd.DataFrame(index=df1.index)])],
|
||||
)
|
||||
def setUp(cls, mock_get_connection, mocked_dfs) -> None:
|
||||
cls.datalake_profiler_interface = PandasProfilerInterface(
|
||||
entity=cls.table_entity,
|
||||
service_connection_config=None,
|
||||
service_connection_config=DatalakeConnection(configSource={}),
|
||||
storage_config=None,
|
||||
ometa_client=None,
|
||||
thread_count=None,
|
||||
|
||||
@ -22,6 +22,10 @@ from sqlalchemy.orm import declarative_base
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column as EntityColumn
|
||||
from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table
|
||||
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
||||
DatalakeConnection,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.profiler.api.models import ProfileSampleConfig
|
||||
from metadata.profiler.interface.pandas.profiler_interface import (
|
||||
PandasProfilerInterface,
|
||||
@ -79,11 +83,9 @@ class DatalakeSampleTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
databaseSchema=EntityReference(id=uuid4(), type="databaseSchema", name="name"),
|
||||
fileFormat="csv",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="id"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="name"),
|
||||
dataType=DataType.STRING,
|
||||
@ -104,6 +106,26 @@ class DatalakeSampleTest(TestCase):
|
||||
name=ColumnName(__root__="age"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="dob"),
|
||||
dataType=DataType.DATETIME,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="tob"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="doe"),
|
||||
dataType=DataType.DATE,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="json"),
|
||||
dataType=DataType.JSON,
|
||||
),
|
||||
EntityColumn(
|
||||
name=ColumnName(__root__="array"),
|
||||
dataType=DataType.ARRAY,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
@ -112,9 +134,8 @@ class DatalakeSampleTest(TestCase):
|
||||
"metadata.profiler.interface.profiler_interface.get_connection",
|
||||
return_value=FakeConnection,
|
||||
)
|
||||
@mock.patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
@mock.patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=[df1, pd.concat([df2, pd.DataFrame(index=df1.index)])],
|
||||
)
|
||||
def setUpClass(cls, mock_get_connection, mocked_dfs) -> None:
|
||||
@ -123,7 +144,7 @@ class DatalakeSampleTest(TestCase):
|
||||
"""
|
||||
cls.datalake_profiler_interface = PandasProfilerInterface(
|
||||
entity=cls.table_entity,
|
||||
service_connection_config=None,
|
||||
service_connection_config=DatalakeConnection(configSource={}),
|
||||
storage_config=None,
|
||||
ometa_client=None,
|
||||
thread_count=None,
|
||||
@ -151,9 +172,8 @@ class DatalakeSampleTest(TestCase):
|
||||
"metadata.profiler.interface.profiler_interface.get_connection",
|
||||
return_value=FakeConnection,
|
||||
)
|
||||
@mock.patch.object(
|
||||
PandasProfilerInterface,
|
||||
"_convert_table_to_list_of_dataframe_objects",
|
||||
@mock.patch(
|
||||
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
||||
return_value=[df1, pd.concat([df2, pd.DataFrame(index=df1.index)])],
|
||||
)
|
||||
def test_sample_property(self, mock_get_connection, mocked_dfs):
|
||||
@ -162,7 +182,7 @@ class DatalakeSampleTest(TestCase):
|
||||
"""
|
||||
datalake_profiler_interface = PandasProfilerInterface(
|
||||
entity=self.table_entity,
|
||||
service_connection_config=None,
|
||||
service_connection_config=DatalakeConnection(configSource={}),
|
||||
storage_config=None,
|
||||
ometa_client=None,
|
||||
thread_count=None,
|
||||
|
||||
@ -119,8 +119,8 @@ def test_check_datalake_type():
|
||||
"column2": DataType.STRING,
|
||||
"column3": DataType.BOOLEAN,
|
||||
"column4": DataType.FLOAT,
|
||||
"column5": DataType.STRING,
|
||||
"column6": DataType.STRING,
|
||||
"column5": DataType.DATETIME,
|
||||
"column6": DataType.DATETIME,
|
||||
"column7": DataType.INT,
|
||||
"column8": DataType.STRING,
|
||||
"column9": DataType.STRING,
|
||||
|
||||
@ -7,6 +7,13 @@ slug: /connectors/ingestion/workflows/profiler
|
||||
|
||||
Learn how to configure and run the Profiler Workflow to extract Profiler data and execute the Data Quality.
|
||||
|
||||
|
||||
{% note %}
|
||||
|
||||
During data profiling for Datalake Profiling, we drop NaN (Not a Number) values from the DataFrame using the dropna() method. However, we make an exception for null values, which are retained. This ensures that our computations are accurate while handling missing data
|
||||
|
||||
{% /note %}
|
||||
|
||||
## UI configuration
|
||||
After the metadata ingestion has been done correctly, we can configure and deploy the Profiler Workflow.
|
||||
|
||||
|
||||
@ -7,6 +7,13 @@ slug: /connectors/ingestion/workflows/profiler
|
||||
|
||||
Learn how to configure and run the Profiler Workflow to extract Profiler data and execute the Data Quality.
|
||||
|
||||
|
||||
{% note %}
|
||||
|
||||
During data profiling for Datalake Profiling, we drop NaN (Not a Number) values from the DataFrame using the dropna() method. However, we make an exception for null values, which are retained. This ensures that our computations are accurate while handling missing data
|
||||
|
||||
{% /note %}
|
||||
|
||||
## UI configuration
|
||||
After the metadata ingestion has been done correctly, we can configure and deploy the Profiler Workflow.
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user