From ad03f9e237fe47b72d1d260fe9c85c3694c3e914 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 20 Sep 2024 16:05:29 +0200 Subject: [PATCH] GEN-996 - Allow PII Processor without storing Sample Data (#17927) * GEN-996 - Allow PII Processor without storing Sample Data * fix import * fix import --- .../metadata/ingestion/sink/metadata_rest.py | 4 +- ingestion/src/metadata/pii/processor.py | 2 +- ingestion/src/metadata/profiler/api/models.py | 15 ++++++- .../src/metadata/profiler/processor/core.py | 17 +++++--- .../src/metadata/profiler/processor/models.py | 43 ++++++++----------- .../orm_profiler/test_pii_processor.py | 4 +- .../unit/profiler/test_profiler_models.py | 25 +++++++++++ 7 files changed, 74 insertions(+), 36 deletions(-) create mode 100644 ingestion/tests/unit/profiler/test_profiler_models.py diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 0b296a88c34..01d6fdd932e 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -565,9 +565,9 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.root}" ) - if record.sample_data: + if record.sample_data and record.sample_data.store: table_data = self.metadata.ingest_table_sample_data( - table=record.table, sample_data=record.sample_data + table=record.table, sample_data=record.sample_data.data ) if not table_data: self.status.failed( diff --git a/ingestion/src/metadata/pii/processor.py b/ingestion/src/metadata/pii/processor.py index 840079f5a6f..95ebb90cbe4 100644 --- a/ingestion/src/metadata/pii/processor.py +++ b/ingestion/src/metadata/pii/processor.py @@ -171,7 +171,7 @@ class PIIProcessor(Processor): col_tags = self.process_column( idx=idx, column=column, - table_data=record.sample_data, + table_data=record.sample_data.data, confidence_threshold=self.confidence_threshold, ) if col_tags: diff --git a/ingestion/src/metadata/profiler/api/models.py b/ingestion/src/metadata/profiler/api/models.py index 274710778a3..591f6f57a59 100644 --- a/ingestion/src/metadata/profiler/api/models.py +++ b/ingestion/src/metadata/profiler/api/models.py @@ -17,9 +17,10 @@ multiple profilers per table and columns. """ from typing import List, Optional, Type, Union -from pydantic import ConfigDict +from pydantic import ConfigDict, Field from sqlalchemy import Column from sqlalchemy.orm import DeclarativeMeta +from typing_extensions import Annotated from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createTableProfile import ( @@ -37,6 +38,7 @@ from metadata.generated.schema.entity.services.connections.connectionBasicType i ) from metadata.generated.schema.tests.customMetric import CustomMetric from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.ingestion.models.custom_pydantic import BaseModel from metadata.ingestion.models.table_metadata import ColumnTag from metadata.profiler.metrics.core import Metric, MetricTypes from metadata.profiler.processor.models import ProfilerDef @@ -104,6 +106,15 @@ class ProfilerProcessorConfig(ConfigModel): databaseConfig: Optional[List[DatabaseAndSchemaConfig]] = [] +class SampleData(BaseModel): + """TableData wrapper to handle ephemeral SampleData""" + + data: Annotated[TableData, Field(None, description="Table Sample Data")] + store: Annotated[ + bool, Field(False, description="Is the sample data should be stored or not") + ] + + class ProfilerResponse(ConfigModel): """ ORM Profiler processor response. @@ -114,7 +125,7 @@ class ProfilerResponse(ConfigModel): table: Table profile: CreateTableProfileRequest - sample_data: Optional[TableData] = None + sample_data: Optional[SampleData] = None column_tags: Optional[List[ColumnTag]] = None def __str__(self): diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index 6f7ff6d4d02..5dbefc892df 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -33,7 +33,6 @@ from metadata.generated.schema.entity.data.table import ( ColumnProfile, ColumnProfilerConfig, SystemProfile, - TableData, TableProfile, ) from metadata.generated.schema.settings.settings import Settings @@ -41,7 +40,7 @@ from metadata.generated.schema.tests.customMetric import ( CustomMetric as CustomMetricEntity, ) from metadata.generated.schema.type.basic import Timestamp -from metadata.profiler.api.models import ProfilerResponse, ThreadPoolMetrics +from metadata.profiler.api.models import ProfilerResponse, SampleData, ThreadPoolMetrics from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.metrics.core import ( ComposedMetric, @@ -492,7 +491,12 @@ class Profiler(Generic[TMetric]): ) self.compute_metrics() - if self.source_config.generateSampleData: + # We need the sample data for Sample Data or PII Sensitive processing. + # We'll nullify the Sample Data after the PII processing so that it's not stored. + if ( + self.source_config.generateSampleData + or self.source_config.processPiiSensitive + ): sample_data = self.generate_sample_data() else: sample_data = None @@ -510,7 +514,7 @@ class Profiler(Generic[TMetric]): return table_profile @calculate_execution_time(store=False) - def generate_sample_data(self) -> Optional[TableData]: + def generate_sample_data(self) -> Optional[SampleData]: """Fetch and ingest sample data Returns: @@ -532,7 +536,10 @@ class Profiler(Generic[TMetric]): SAMPLE_DATA_DEFAULT_COUNT, self.profiler_interface.sample_data_count ) ] - return table_data + return SampleData( + data=table_data, store=self.source_config.generateSampleData + ) + except Exception as err: logger.debug(traceback.format_exc()) logger.warning(f"Error fetching sample data: {err}") diff --git a/ingestion/src/metadata/profiler/processor/models.py b/ingestion/src/metadata/profiler/processor/models.py index 650f10e19bd..7244f5cfc71 100644 --- a/ingestion/src/metadata/profiler/processor/models.py +++ b/ingestion/src/metadata/profiler/processor/models.py @@ -15,11 +15,28 @@ JSON workflows to the profiler """ from typing import List, Optional -from pydantic import BaseModel, validator +from pydantic import BaseModel, BeforeValidator +from typing_extensions import Annotated from metadata.profiler.metrics.registry import Metrics +def valid_metric(value: str): + """ + Validate that the input metrics are correctly named + and can be found in the Registry + """ + if not Metrics.get(value.upper()): + raise ValueError( + f"Metric name {value} is not a proper metric name from the Registry" + ) + + return value.upper() + + +ValidMetric = Annotated[str, BeforeValidator(valid_metric)] + + class ProfilerDef(BaseModel): """ Incoming profiler definition from the @@ -30,26 +47,4 @@ class ProfilerDef(BaseModel): timeout_seconds: Optional[ int ] = None # Stop running a query after X seconds and continue - metrics: Optional[ - List[str] - ] = None # names of currently supported Static and Composed metrics - # TBD: - # time_metrics: List[TimeMetricDef] = None - # custom_metrics: List[CustomMetricDef] = None - # rule_metrics: ... - - # pylint: disable=no-self-argument - @validator("metrics", each_item=True) - def valid_metric(cls, value): - """ - We are using cls as per pydantic docs - - Validate that the input metrics are correctly named - and can be found in the Registry - """ - if not Metrics.get(value.upper()): - raise ValueError( - f"Metric name {value} is not a proper metric name from the Registry" - ) - - return value.upper() + metrics: Optional[List[ValidMetric]] = None diff --git a/ingestion/tests/integration/orm_profiler/test_pii_processor.py b/ingestion/tests/integration/orm_profiler/test_pii_processor.py index 53d66ba8848..19cd6b4bf23 100644 --- a/ingestion/tests/integration/orm_profiler/test_pii_processor.py +++ b/ingestion/tests/integration/orm_profiler/test_pii_processor.py @@ -64,7 +64,7 @@ from metadata.generated.schema.type.tagLabel import TagFQN, TagLabel from metadata.ingestion.models.table_metadata import ColumnTag from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.pii.processor import PIIProcessor -from metadata.profiler.api.models import ProfilerResponse +from metadata.profiler.api.models import ProfilerResponse, SampleData table_data = TableData( columns=[ @@ -314,7 +314,7 @@ class PiiProcessorTest(TestCase): ) ) ), - sample_data=table_data, + sample_data=SampleData(data=table_data), ) updated_record: ProfilerResponse = self.pii_processor.run(record) diff --git a/ingestion/tests/unit/profiler/test_profiler_models.py b/ingestion/tests/unit/profiler/test_profiler_models.py new file mode 100644 index 00000000000..86c61e8f2ce --- /dev/null +++ b/ingestion/tests/unit/profiler/test_profiler_models.py @@ -0,0 +1,25 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Profiler models behave properly""" +import pytest + +from metadata.profiler.processor.models import ProfilerDef + + +def test_valid_metrics(): + """ + Test that the metrics are valid + """ + profiler_def = ProfilerDef(name="test", metrics=["count"]) + assert profiler_def.metrics == ["COUNT"] + + with pytest.raises(ValueError): + ProfilerDef(name="test", metrics=["potato"])