GEN-996 - Allow PII Processor without storing Sample Data (#17927)

* GEN-996 - Allow PII Processor without storing Sample Data

* fix import

* fix import
This commit is contained in:
Pere Miquel Brull 2024-09-20 16:05:29 +02:00
parent 01e4b04573
commit ad03f9e237
7 changed files with 74 additions and 36 deletions

View File

@ -565,9 +565,9 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods
f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.root}" f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.root}"
) )
if record.sample_data: if record.sample_data and record.sample_data.store:
table_data = self.metadata.ingest_table_sample_data( table_data = self.metadata.ingest_table_sample_data(
table=record.table, sample_data=record.sample_data table=record.table, sample_data=record.sample_data.data
) )
if not table_data: if not table_data:
self.status.failed( self.status.failed(

View File

@ -171,7 +171,7 @@ class PIIProcessor(Processor):
col_tags = self.process_column( col_tags = self.process_column(
idx=idx, idx=idx,
column=column, column=column,
table_data=record.sample_data, table_data=record.sample_data.data,
confidence_threshold=self.confidence_threshold, confidence_threshold=self.confidence_threshold,
) )
if col_tags: if col_tags:

View File

@ -17,9 +17,10 @@ multiple profilers per table and columns.
""" """
from typing import List, Optional, Type, Union from typing import List, Optional, Type, Union
from pydantic import ConfigDict from pydantic import ConfigDict, Field
from sqlalchemy import Column from sqlalchemy import Column
from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm import DeclarativeMeta
from typing_extensions import Annotated
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTableProfile import ( from metadata.generated.schema.api.data.createTableProfile import (
@ -37,6 +38,7 @@ from metadata.generated.schema.entity.services.connections.connectionBasicType i
) )
from metadata.generated.schema.tests.customMetric import CustomMetric from metadata.generated.schema.tests.customMetric import CustomMetric
from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.models.table_metadata import ColumnTag from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.profiler.metrics.core import Metric, MetricTypes from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.processor.models import ProfilerDef from metadata.profiler.processor.models import ProfilerDef
@ -104,6 +106,15 @@ class ProfilerProcessorConfig(ConfigModel):
databaseConfig: Optional[List[DatabaseAndSchemaConfig]] = [] databaseConfig: Optional[List[DatabaseAndSchemaConfig]] = []
class SampleData(BaseModel):
"""TableData wrapper to handle ephemeral SampleData"""
data: Annotated[TableData, Field(None, description="Table Sample Data")]
store: Annotated[
bool, Field(False, description="Is the sample data should be stored or not")
]
class ProfilerResponse(ConfigModel): class ProfilerResponse(ConfigModel):
""" """
ORM Profiler processor response. ORM Profiler processor response.
@ -114,7 +125,7 @@ class ProfilerResponse(ConfigModel):
table: Table table: Table
profile: CreateTableProfileRequest profile: CreateTableProfileRequest
sample_data: Optional[TableData] = None sample_data: Optional[SampleData] = None
column_tags: Optional[List[ColumnTag]] = None column_tags: Optional[List[ColumnTag]] = None
def __str__(self): def __str__(self):

View File

@ -33,7 +33,6 @@ from metadata.generated.schema.entity.data.table import (
ColumnProfile, ColumnProfile,
ColumnProfilerConfig, ColumnProfilerConfig,
SystemProfile, SystemProfile,
TableData,
TableProfile, TableProfile,
) )
from metadata.generated.schema.settings.settings import Settings from metadata.generated.schema.settings.settings import Settings
@ -41,7 +40,7 @@ from metadata.generated.schema.tests.customMetric import (
CustomMetric as CustomMetricEntity, CustomMetric as CustomMetricEntity,
) )
from metadata.generated.schema.type.basic import Timestamp from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.api.models import ProfilerResponse, ThreadPoolMetrics from metadata.profiler.api.models import ProfilerResponse, SampleData, ThreadPoolMetrics
from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import ( from metadata.profiler.metrics.core import (
ComposedMetric, ComposedMetric,
@ -492,7 +491,12 @@ class Profiler(Generic[TMetric]):
) )
self.compute_metrics() self.compute_metrics()
if self.source_config.generateSampleData: # We need the sample data for Sample Data or PII Sensitive processing.
# We'll nullify the Sample Data after the PII processing so that it's not stored.
if (
self.source_config.generateSampleData
or self.source_config.processPiiSensitive
):
sample_data = self.generate_sample_data() sample_data = self.generate_sample_data()
else: else:
sample_data = None sample_data = None
@ -510,7 +514,7 @@ class Profiler(Generic[TMetric]):
return table_profile return table_profile
@calculate_execution_time(store=False) @calculate_execution_time(store=False)
def generate_sample_data(self) -> Optional[TableData]: def generate_sample_data(self) -> Optional[SampleData]:
"""Fetch and ingest sample data """Fetch and ingest sample data
Returns: Returns:
@ -532,7 +536,10 @@ class Profiler(Generic[TMetric]):
SAMPLE_DATA_DEFAULT_COUNT, self.profiler_interface.sample_data_count SAMPLE_DATA_DEFAULT_COUNT, self.profiler_interface.sample_data_count
) )
] ]
return table_data return SampleData(
data=table_data, store=self.source_config.generateSampleData
)
except Exception as err: except Exception as err:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Error fetching sample data: {err}") logger.warning(f"Error fetching sample data: {err}")

View File

@ -15,11 +15,28 @@ JSON workflows to the profiler
""" """
from typing import List, Optional from typing import List, Optional
from pydantic import BaseModel, validator from pydantic import BaseModel, BeforeValidator
from typing_extensions import Annotated
from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.registry import Metrics
def valid_metric(value: str):
"""
Validate that the input metrics are correctly named
and can be found in the Registry
"""
if not Metrics.get(value.upper()):
raise ValueError(
f"Metric name {value} is not a proper metric name from the Registry"
)
return value.upper()
ValidMetric = Annotated[str, BeforeValidator(valid_metric)]
class ProfilerDef(BaseModel): class ProfilerDef(BaseModel):
""" """
Incoming profiler definition from the Incoming profiler definition from the
@ -30,26 +47,4 @@ class ProfilerDef(BaseModel):
timeout_seconds: Optional[ timeout_seconds: Optional[
int int
] = None # Stop running a query after X seconds and continue ] = None # Stop running a query after X seconds and continue
metrics: Optional[ metrics: Optional[List[ValidMetric]] = None
List[str]
] = None # names of currently supported Static and Composed metrics
# TBD:
# time_metrics: List[TimeMetricDef] = None
# custom_metrics: List[CustomMetricDef] = None
# rule_metrics: ...
# pylint: disable=no-self-argument
@validator("metrics", each_item=True)
def valid_metric(cls, value):
"""
We are using cls as per pydantic docs
Validate that the input metrics are correctly named
and can be found in the Registry
"""
if not Metrics.get(value.upper()):
raise ValueError(
f"Metric name {value} is not a proper metric name from the Registry"
)
return value.upper()

View File

@ -64,7 +64,7 @@ from metadata.generated.schema.type.tagLabel import TagFQN, TagLabel
from metadata.ingestion.models.table_metadata import ColumnTag from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.processor import PIIProcessor from metadata.pii.processor import PIIProcessor
from metadata.profiler.api.models import ProfilerResponse from metadata.profiler.api.models import ProfilerResponse, SampleData
table_data = TableData( table_data = TableData(
columns=[ columns=[
@ -314,7 +314,7 @@ class PiiProcessorTest(TestCase):
) )
) )
), ),
sample_data=table_data, sample_data=SampleData(data=table_data),
) )
updated_record: ProfilerResponse = self.pii_processor.run(record) updated_record: ProfilerResponse = self.pii_processor.run(record)

View File

@ -0,0 +1,25 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Profiler models behave properly"""
import pytest
from metadata.profiler.processor.models import ProfilerDef
def test_valid_metrics():
"""
Test that the metrics are valid
"""
profiler_def = ProfilerDef(name="test", metrics=["count"])
assert profiler_def.metrics == ["COUNT"]
with pytest.raises(ValueError):
ProfilerDef(name="test", metrics=["potato"])