From 3d8e7e6d41b689e4a5881a900f0fb720d6004edd Mon Sep 17 00:00:00 2001 From: Teddy Date: Thu, 23 Mar 2023 07:22:19 +0100 Subject: [PATCH] Fixes #10290 - NUMERIC type assigned the wrong dtype in converter.py (#10708) * fix: profiler refactor * fix: catch division by zero error * fix: instantiated new column object from name to delegate type handler back to dbapi * fix: reverted columns instantiation and updated NUMBER type to NUMERIC * fix: updated doc string for process_pii_sensitive method --- .../src/metadata/profiler/api/workflow.py | 3 +- .../metrics/composed/non_parametric_skew.py | 10 ++- .../src/metadata/profiler/orm/converter.py | 2 +- .../src/metadata/profiler/profiler/core.py | 81 +++++++++++-------- 4 files changed, 58 insertions(+), 38 deletions(-) diff --git a/ingestion/src/metadata/profiler/api/workflow.py b/ingestion/src/metadata/profiler/api/workflow.py index bc10155a82f..73a4759c730 100644 --- a/ingestion/src/metadata/profiler/api/workflow.py +++ b/ingestion/src/metadata/profiler/api/workflow.py @@ -362,7 +362,8 @@ class ProfilerWorkflow(WorkflowStatusMixin): self.create_profiler(entity, profiler_interface) self.profiler = cast(Profiler, self.profiler) # satisfy type checker profile: ProfilerResponse = self.profiler.process( - self.source_config.generateSampleData + self.source_config.generateSampleData, + self.source_config.processPiiSensitive, ) except Exception as exc: # pylint: disable=broad-except diff --git a/ingestion/src/metadata/profiler/metrics/composed/non_parametric_skew.py b/ingestion/src/metadata/profiler/metrics/composed/non_parametric_skew.py index b86ae6c3983..49076144c7d 100644 --- a/ingestion/src/metadata/profiler/metrics/composed/non_parametric_skew.py +++ b/ingestion/src/metadata/profiler/metrics/composed/non_parametric_skew.py @@ -53,8 +53,10 @@ class NonParametricSkew(ComposedMetric): res_median = res.get(Median.name()) if res_mean is not None and res_stddev is not None and res_median is not None: - return (float(res_mean) - float(res_median)) / float( - res_stddev - ) # convert from decimal - + try: + return (float(res_mean) - float(res_median)) / float( + res_stddev + ) # convert from decimal + except ZeroDivisionError: + return None return None diff --git a/ingestion/src/metadata/profiler/orm/converter.py b/ingestion/src/metadata/profiler/orm/converter.py index b16ce189cb8..e8348749e42 100644 --- a/ingestion/src/metadata/profiler/orm/converter.py +++ b/ingestion/src/metadata/profiler/orm/converter.py @@ -30,7 +30,7 @@ from metadata.profiler.orm.registry import CustomTypes Base = declarative_base() _TYPE_MAP = { - DataType.NUMBER: sqlalchemy.INTEGER, + DataType.NUMBER: sqlalchemy.NUMERIC, DataType.TINYINT: sqlalchemy.SMALLINT, DataType.SMALLINT: sqlalchemy.SMALLINT, DataType.INT: sqlalchemy.INT, diff --git a/ingestion/src/metadata/profiler/profiler/core.py b/ingestion/src/metadata/profiler/profiler/core.py index 750cee5bdee..75ff65aca37 100644 --- a/ingestion/src/metadata/profiler/profiler/core.py +++ b/ingestion/src/metadata/profiler/profiler/core.py @@ -30,6 +30,7 @@ from metadata.generated.schema.entity.data.table import ( ColumnProfile, ColumnProfilerConfig, SystemProfile, + TableData, TableProfile, ) from metadata.ingestion.processor.pii import NERScanner @@ -442,7 +443,11 @@ class Profiler(Generic[TMetric]): return self - def process(self, generate_sample_data: Optional[bool]) -> ProfilerResponse: + def process( + self, + generate_sample_data: Optional[bool], + process_pii_sensitive: Optional[bool], + ) -> ProfilerResponse: """ Given a table, we will prepare the profiler for all its columns and return all the run profilers @@ -454,40 +459,13 @@ class Profiler(Generic[TMetric]): self.compute_metrics() if generate_sample_data: - try: - logger.info( - f"Fetching sample data for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." - ) - sample_data = self.profiler_interface.fetch_sample_data(self.table) - - if self.profiler_interface.source_config.processPiiSensitive: - try: - entity_scanner = NERScanner( - metadata=self.profiler_interface.ometa_client - ) - entity_scanner.process( - sample_data, - self.profiler_interface.table_entity, - self.profiler_interface.ometa_client, - ) - except Exception as exc: - logger.warning( - f"Unexpected error while processing sample data for auto pii tagging - {exc}" - ) - logger.debug(traceback.format_exc()) - - logger.info( - "Successfully fetched sample data for " - f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." - ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.warning(f"Error fetching sample data: {err}") - sample_data = None - + sample_data = self.generate_sample_data() else: sample_data = None + if process_pii_sensitive and sample_data: + self.process_pii_sensitive(sample_data) + profile = self._check_profile_and_handle(self.get_profile()) table_profile = ProfilerResponse( @@ -498,6 +476,45 @@ class Profiler(Generic[TMetric]): return table_profile + def generate_sample_data(self) -> TableData: + """Fetch and ingest sample data + + Returns: + TableData: sample data + """ + try: + logger.info( + "Fetching sample data for " + f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." # type: ignore + ) + return self.profiler_interface.fetch_sample_data(self.table) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Error fetching sample data: {err}") + return None + + def process_pii_sensitive(self, sample_data: TableData) -> None: + """Read sample data to find pii sensitive columns and tag them + as PII sensitive data + + Args: + sample_data (TableData): sample data + """ + try: + entity_scanner = NERScanner( + metadata=self.profiler_interface.ometa_client # type: ignore + ) + entity_scanner.process( + sample_data, + self.profiler_interface.table_entity, # type: ignore + self.profiler_interface.ometa_client, # type: ignore + ) + except Exception as exc: + logger.warning( + f"Unexpected error while processing sample data for auto pii tagging - {exc}" + ) + logger.debug(traceback.format_exc()) + def get_profile(self) -> CreateTableProfileRequest: """ After executing the profiler, get all results