mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-07 16:11:30 +00:00
* fix: profiler refactor * fix: catch division by zero error * fix: instantiated new column object from name to delegate type handler back to dbapi * fix: reverted columns instantiation and updated NUMBER type to NUMERIC * fix: updated doc string for process_pii_sensitive method
This commit is contained in:
parent
bbce9c5aa4
commit
3d8e7e6d41
@ -362,7 +362,8 @@ class ProfilerWorkflow(WorkflowStatusMixin):
|
|||||||
self.create_profiler(entity, profiler_interface)
|
self.create_profiler(entity, profiler_interface)
|
||||||
self.profiler = cast(Profiler, self.profiler) # satisfy type checker
|
self.profiler = cast(Profiler, self.profiler) # satisfy type checker
|
||||||
profile: ProfilerResponse = self.profiler.process(
|
profile: ProfilerResponse = self.profiler.process(
|
||||||
self.source_config.generateSampleData
|
self.source_config.generateSampleData,
|
||||||
|
self.source_config.processPiiSensitive,
|
||||||
)
|
)
|
||||||
except Exception as exc: # pylint: disable=broad-except
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
|
|
||||||
|
@ -53,8 +53,10 @@ class NonParametricSkew(ComposedMetric):
|
|||||||
res_median = res.get(Median.name())
|
res_median = res.get(Median.name())
|
||||||
|
|
||||||
if res_mean is not None and res_stddev is not None and res_median is not None:
|
if res_mean is not None and res_stddev is not None and res_median is not None:
|
||||||
return (float(res_mean) - float(res_median)) / float(
|
try:
|
||||||
res_stddev
|
return (float(res_mean) - float(res_median)) / float(
|
||||||
) # convert from decimal
|
res_stddev
|
||||||
|
) # convert from decimal
|
||||||
|
except ZeroDivisionError:
|
||||||
|
return None
|
||||||
return None
|
return None
|
||||||
|
@ -30,7 +30,7 @@ from metadata.profiler.orm.registry import CustomTypes
|
|||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
|
|
||||||
_TYPE_MAP = {
|
_TYPE_MAP = {
|
||||||
DataType.NUMBER: sqlalchemy.INTEGER,
|
DataType.NUMBER: sqlalchemy.NUMERIC,
|
||||||
DataType.TINYINT: sqlalchemy.SMALLINT,
|
DataType.TINYINT: sqlalchemy.SMALLINT,
|
||||||
DataType.SMALLINT: sqlalchemy.SMALLINT,
|
DataType.SMALLINT: sqlalchemy.SMALLINT,
|
||||||
DataType.INT: sqlalchemy.INT,
|
DataType.INT: sqlalchemy.INT,
|
||||||
|
@ -30,6 +30,7 @@ from metadata.generated.schema.entity.data.table import (
|
|||||||
ColumnProfile,
|
ColumnProfile,
|
||||||
ColumnProfilerConfig,
|
ColumnProfilerConfig,
|
||||||
SystemProfile,
|
SystemProfile,
|
||||||
|
TableData,
|
||||||
TableProfile,
|
TableProfile,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.processor.pii import NERScanner
|
from metadata.ingestion.processor.pii import NERScanner
|
||||||
@ -442,7 +443,11 @@ class Profiler(Generic[TMetric]):
|
|||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def process(self, generate_sample_data: Optional[bool]) -> ProfilerResponse:
|
def process(
|
||||||
|
self,
|
||||||
|
generate_sample_data: Optional[bool],
|
||||||
|
process_pii_sensitive: Optional[bool],
|
||||||
|
) -> ProfilerResponse:
|
||||||
"""
|
"""
|
||||||
Given a table, we will prepare the profiler for
|
Given a table, we will prepare the profiler for
|
||||||
all its columns and return all the run profilers
|
all its columns and return all the run profilers
|
||||||
@ -454,40 +459,13 @@ class Profiler(Generic[TMetric]):
|
|||||||
|
|
||||||
self.compute_metrics()
|
self.compute_metrics()
|
||||||
if generate_sample_data:
|
if generate_sample_data:
|
||||||
try:
|
sample_data = self.generate_sample_data()
|
||||||
logger.info(
|
|
||||||
f"Fetching sample data for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
|
|
||||||
)
|
|
||||||
sample_data = self.profiler_interface.fetch_sample_data(self.table)
|
|
||||||
|
|
||||||
if self.profiler_interface.source_config.processPiiSensitive:
|
|
||||||
try:
|
|
||||||
entity_scanner = NERScanner(
|
|
||||||
metadata=self.profiler_interface.ometa_client
|
|
||||||
)
|
|
||||||
entity_scanner.process(
|
|
||||||
sample_data,
|
|
||||||
self.profiler_interface.table_entity,
|
|
||||||
self.profiler_interface.ometa_client,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(
|
|
||||||
f"Unexpected error while processing sample data for auto pii tagging - {exc}"
|
|
||||||
)
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Successfully fetched sample data for "
|
|
||||||
f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
|
|
||||||
)
|
|
||||||
except Exception as err:
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
logger.warning(f"Error fetching sample data: {err}")
|
|
||||||
sample_data = None
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
sample_data = None
|
sample_data = None
|
||||||
|
|
||||||
|
if process_pii_sensitive and sample_data:
|
||||||
|
self.process_pii_sensitive(sample_data)
|
||||||
|
|
||||||
profile = self._check_profile_and_handle(self.get_profile())
|
profile = self._check_profile_and_handle(self.get_profile())
|
||||||
|
|
||||||
table_profile = ProfilerResponse(
|
table_profile = ProfilerResponse(
|
||||||
@ -498,6 +476,45 @@ class Profiler(Generic[TMetric]):
|
|||||||
|
|
||||||
return table_profile
|
return table_profile
|
||||||
|
|
||||||
|
def generate_sample_data(self) -> TableData:
|
||||||
|
"""Fetch and ingest sample data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
TableData: sample data
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.info(
|
||||||
|
"Fetching sample data for "
|
||||||
|
f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." # type: ignore
|
||||||
|
)
|
||||||
|
return self.profiler_interface.fetch_sample_data(self.table)
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.warning(f"Error fetching sample data: {err}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def process_pii_sensitive(self, sample_data: TableData) -> None:
|
||||||
|
"""Read sample data to find pii sensitive columns and tag them
|
||||||
|
as PII sensitive data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sample_data (TableData): sample data
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
entity_scanner = NERScanner(
|
||||||
|
metadata=self.profiler_interface.ometa_client # type: ignore
|
||||||
|
)
|
||||||
|
entity_scanner.process(
|
||||||
|
sample_data,
|
||||||
|
self.profiler_interface.table_entity, # type: ignore
|
||||||
|
self.profiler_interface.ometa_client, # type: ignore
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(
|
||||||
|
f"Unexpected error while processing sample data for auto pii tagging - {exc}"
|
||||||
|
)
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
|
||||||
def get_profile(self) -> CreateTableProfileRequest:
|
def get_profile(self) -> CreateTableProfileRequest:
|
||||||
"""
|
"""
|
||||||
After executing the profiler, get all results
|
After executing the profiler, get all results
|
||||||
|
Loading…
x
Reference in New Issue
Block a user