Pere Menal-Ferrer a7e2f33adc
feature/pii-column-classifier (#21200)
* Add PII Tag and Sensitivity Level enums.

* Add feature-extraction for PII classification tasks

* Add faker as test dependency

* Add unit tests for presidio tag extractor

* Add PIISensitivityTags enum and update sensitivity mapping logic

* Add Presidio utility functions for PII analysis

* Extend column name regexs for PII

* Add colum name split

* Move pii algorithms to dedicated package

* Add tests for PAN, NIF, SSN entities

* Fix linting

* Add comment on why we need to set specific lanaguage to Presidio recognizers, as per PR suggestion.

* Fix version of faker to prevent flaky tests. Fix failing tests.

* Fix wrong import

---------

Co-authored-by: Pere Menal <pere.menal@getcollate.io>
2025-05-16 14:03:49 +02:00

174 lines
5.5 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Classifier for PII detection and sensitivity tagging.
"""
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import (
Any,
DefaultDict,
Dict,
Generic,
Hashable,
Mapping,
Optional,
Sequence,
Set,
TypeVar,
final,
)
from presidio_analyzer import AnalyzerEngine
from metadata.generated.schema.entity.data.table import DataType
from metadata.pii.algorithms.column_patterns import get_pii_column_name_patterns
from metadata.pii.algorithms.feature_extraction import (
extract_pii_from_column_names,
extract_pii_tags,
is_non_pii_datatype,
split_column_name,
)
from metadata.pii.algorithms.preprocessing import preprocess_values
from metadata.pii.algorithms.presidio_patches import url_patcher
from metadata.pii.algorithms.presidio_utils import (
build_analyzer_engine,
set_presidio_logger_level,
)
from metadata.pii.algorithms.tags import PIISensitivityTag, PIITag
T = TypeVar("T", bound=Hashable)
class ColumnClassifier(ABC, Generic[T]):
"""
Base class for column classifiers.
This class defines the interface for classifiers that predict the class
of a column based on its data and metadata.
"""
@abstractmethod
def predict_scores(
self,
sample_data: Sequence[Any],
column_name: Optional[str] = None,
column_data_type: Optional[DataType] = None,
) -> Mapping[T, float]:
"""
Predict the scores for the given column and sample data of the column.
The scores are a mapping of class labels to their respective scores:
higher scores indicate a higher likelihood of the class for the given inputs.
"""
# Implementations
@final
class HeuristicPIIClassifier(ColumnClassifier[PIITag]):
"""
Heuristic PII Column Classifier
"""
def __init__(
self,
*,
column_name_contribution: float = 0.5,
score_cutoff: float = 0.1,
relative_cardinality_cutoff: float = 0.01,
):
set_presidio_logger_level()
self._presidio_analyzer: AnalyzerEngine = build_analyzer_engine()
self._column_name_patterns = get_pii_column_name_patterns()
self._column_name_contribution = column_name_contribution
self._score_cutoff = score_cutoff
self._relative_cardinality_cutoff = relative_cardinality_cutoff
def predict_scores(
self,
sample_data: Sequence[Any],
column_name: Optional[str] = None,
column_data_type: Optional[DataType] = None,
) -> Mapping[PIITag, float]:
if column_data_type is not None and is_non_pii_datatype(column_data_type):
return {}
str_values = preprocess_values(sample_data)
if not str_values:
return {}
# Relative cardinality test
unique_values = set(str_values)
if len(unique_values) / len(str_values) < self._relative_cardinality_cutoff:
return {}
context = split_column_name(column_name) if column_name else None
content_results = extract_pii_tags(
self._presidio_analyzer,
str_values,
context=context,
recognizer_result_patcher=url_patcher,
)
column_name_matches: Set[PIITag] = set()
if column_name is not None:
column_name_matches = extract_pii_from_column_names(
column_name, patterns=self._column_name_patterns
)
final_results: Dict[PIITag, float] = {}
for tag, score in content_results.items():
final_score = score
if tag in column_name_matches:
final_score += self._column_name_contribution
# Apply the score cutoff
if final_score >= self._score_cutoff:
final_results[tag] = final_score
return final_results
class PIISensitiveClassifier(ColumnClassifier[PIISensitivityTag]):
"""
Implements a classifier for PII sensitivity tags based on a given
PII column classifier. If no classifier is provided, it defaults to
using the HeuristicPIIColumnClassifier.
"""
def __init__(self, classifier: Optional[ColumnClassifier[PIITag]] = None):
self.classifier: ColumnClassifier[PIITag] = (
classifier or HeuristicPIIClassifier()
)
def predict_scores(
self,
sample_data: Sequence[Any],
column_name: Optional[str] = None,
column_data_type: Optional[DataType] = None,
) -> Mapping[PIISensitivityTag, float]:
pii_tags = self.classifier.predict_scores(
sample_data, column_name, column_data_type
)
results: DefaultDict[PIISensitivityTag, float] = defaultdict(float)
for tag, score in pii_tags.items():
# Convert PIITag to PIISensitivityTag
pii_sensitivity = tag.sensitivity()
results[pii_sensitivity] += score
return results