feature/pii-processor-improvement (#21248)

* Add PII Tag and Sensitivity Level enums.

* Add feature-extraction for PII classification tasks

* Add faker as test dependency

* Add unit tests for presidio tag extractor

* Add PIISensitivityTags enum and update sensitivity mapping logic

* Add Presidio utility functions for PII analysis

* Extend column name regexs for PII

* Add tests for PAN, NIF, SSN entities

* Fix version of faker to prevent flaky tests. Fix failing tests.

* Add Generated to State enum

* Integrate PIISensitive classifier to PIIProcessor
This commit is contained in:
Pere Menal-Ferrer 2025-05-19 19:52:17 +02:00 committed by GitHub
parent a31504139b
commit 5d2dfa712a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 250 additions and 140 deletions

View File

@ -7,7 +7,7 @@ repos:
- id: check-json
exclude: vscode
- repo: https://github.com/hadialqattan/pycln
rev: v2.4.0
rev: v2.5.0
hooks:
- id: pycln
files: ^(ingestion|openmetadata-airflow-apis)/

View File

@ -260,7 +260,7 @@ ignore = [
"src/metadata/mixins/*",
"src/metadata/parsers/*",
"src/metadata/pii/scanners/*",
"src/metadata/pii/processor.py",
"src/metadata/pii/*processor.py",
"src/metadata/profiler/*",
"src/metadata/sampler/*",
"src/metadata/readers/*",

View File

@ -164,10 +164,17 @@ class PIISensitiveClassifier(ColumnClassifier[PIISensitivityTag]):
sample_data, column_name, column_data_type
)
results: DefaultDict[PIISensitivityTag, float] = defaultdict(float)
counts: DefaultDict[PIISensitivityTag, int] = defaultdict(int)
for tag, score in pii_tags.items():
# Convert PIITag to PIISensitivityTag
pii_sensitivity = tag.sensitivity()
results[pii_sensitivity] += score
counts[pii_sensitivity] += 1
# Normalize the scores
for tag in results:
if counts[tag] > 0:
results[tag] /= counts[tag]
return results

View File

@ -11,6 +11,7 @@
"""
Preprocessing functions for the classification tasks.
"""
import datetime
import json
from typing import Any, List, Mapping, Optional, Sequence
@ -27,7 +28,8 @@ def convert_to_str(value: Any) -> Optional[str]:
"""
if isinstance(value, str):
return value
if isinstance(value, (int, float)):
if isinstance(value, (int, float, datetime.datetime, datetime.date)):
# Values we want to convert to string out of the box
return str(value)
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")

View File

@ -0,0 +1,38 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions for PII algorithms
"""
from typing import Mapping, Sequence, TypeVar
T = TypeVar("T")
def normalize_scores(scores: Mapping[T, float], tol: float = 0.01) -> Mapping[T, float]:
"""
Normalize the scores to sum to 1, while ignoring scores below the tolerance.
Scores must be positive.
"""
scores = {key: score for key, score in scores.items() if score > tol}
total = sum(scores.values())
if total == 0:
return scores
return {key: score / total for key, score in scores.items()}
def get_top_classes(scores: Mapping[T, float], n: int, threshold: float) -> Sequence[T]:
"""
Get the top n scores from the scores mapping that are above the threshold.
The classes are sorted in descending order of their scores.
"""
sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True)
top_classes = [key for key, score in sorted_scores if score >= threshold]
return top_classes[:n]

View File

@ -0,0 +1,125 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for the Auto Classification Processor.
"""
import traceback
from abc import ABC, abstractmethod
from typing import Any, Optional, Sequence, Type, TypeVar, cast, final
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import (
DatabaseServiceAutoClassificationPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.steps import Processor
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.sampler.models import SamplerResponse
C = TypeVar("C", bound="AutoClassificationProcessor")
class AutoClassificationProcessor(Processor, ABC):
"""
Abstract class for the Auto Classification Processor.
Implementations should only provide the logic for creating tags based on sample data,
and rely on the running part to be handled by the base class.
"""
# Some methods are marked as final to prevent overriding in subclasses thus
# ensuring that the workflow is always run in the same way keeping implementer
# with the responsibility of *only* implementing the logic for creating tags.
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
):
super().__init__()
self.config = config
self.metadata = metadata
# Init and type the source config
self.source_config: DatabaseServiceAutoClassificationPipeline = cast(
DatabaseServiceAutoClassificationPipeline,
self.config.source.sourceConfig.config,
) # Used to satisfy type checked
@abstractmethod
def create_column_tag_labels(
self, column: Column, sample_data: Sequence[Any]
) -> Sequence[TagLabel]:
"""
Create tags for the column based on the sample data.
"""
@property
def name(self) -> str:
return "Auto Classification Processor"
def close(self) -> None:
"""Nothing to close"""
@classmethod
@final
def create(
cls: Type[C],
config_dict: dict,
metadata: OpenMetadata,
pipeline_name: Optional[str] = None,
) -> C:
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config, metadata=metadata)
@final
def _run(self, record: SamplerResponse) -> Either[SamplerResponse]:
"""
Main entrypoint for the processor.
"""
# We don't always need to process
if not self.source_config.enableAutoClassification:
return Either(right=record, left=None)
column_tags = []
for idx, column in enumerate(record.table.columns):
try:
tags = self.create_column_tag_labels(
column=column,
sample_data=[row[idx] for row in record.sample_data.data.rows],
)
for tag in tags:
column_tag = ColumnTag(
column_fqn=column.fullyQualifiedName.root, tag_label=tag
)
column_tags.append(column_tag)
except Exception as err:
# TODO: Shouldn't we return a Left here?
self.status.failed(
StackTraceError(
name=record.table.fullyQualifiedName.root,
error=f"Error in Processor {self.name} computing tags for [{column}] - [{err}]",
stackTrace=traceback.format_exc(),
)
)
record.column_tags = column_tags
return Either(right=record, left=None)

View File

@ -12,16 +12,10 @@
"""
Processor util to fetch pii sensitive columns
"""
import traceback
from typing import List, Optional, cast
from typing import Any, Sequence
from metadata.generated.schema.entity.data.table import Column, TableData
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import (
DatabaseServiceAutoClassificationPipeline,
)
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
@ -31,24 +25,21 @@ from metadata.generated.schema.type.tagLabel import (
TagLabel,
TagSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Processor
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.algorithms.classifiers import ColumnClassifier, PIISensitiveClassifier
from metadata.pii.algorithms.tags import PIISensitivityTag
from metadata.pii.algorithms.utils import get_top_classes, normalize_scores
from metadata.pii.base_processor import AutoClassificationProcessor
from metadata.pii.constants import PII
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
from metadata.pii.scanners.ner_scanner import NERScanner
from metadata.sampler.models import SamplerResponse
from metadata.utils import fqn
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class PIIProcessor(Processor):
class PIIProcessor(AutoClassificationProcessor):
"""
A scanner that uses Spacy NER for entity recognition
An AutoClassificationProcessor that uses a PIISensitive classifier to tag columns.
"""
def __init__(
@ -56,50 +47,21 @@ class PIIProcessor(Processor):
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
):
super().__init__()
self.config = config
self.metadata = metadata
super().__init__(config, metadata)
self._classifier: ColumnClassifier[PIISensitivityTag] = PIISensitiveClassifier()
# Init and type the source config
self.source_config: DatabaseServiceAutoClassificationPipeline = cast(
DatabaseServiceAutoClassificationPipeline,
self.config.source.sourceConfig.config,
) # Used to satisfy type checked
self._ner_scanner = None
self.name_scanner = ColumnNameScanner()
self.confidence_threshold = self.source_config.confidence
@property
def name(self) -> str:
return "Auto Classification Processor"
@property
def ner_scanner(self) -> NERScanner:
"""Load the NER Scanner only if called"""
if self._ner_scanner is None:
self._ner_scanner = NERScanner()
return self._ner_scanner
@classmethod
def create(
cls,
config_dict: dict,
metadata: OpenMetadata,
pipeline_name: Optional[str] = None,
) -> "Step":
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config, metadata=metadata)
def close(self) -> None:
"""Nothing to close"""
self.confidence_threshold = self.source_config.confidence / 100
self._tolerance = 0.01
@staticmethod
def build_column_tag(tag_fqn: str, column_fqn: str) -> ColumnTag:
"""
Build the tag and run the PATCH
"""
def build_tag_label(tag: PIISensitivityTag) -> TagLabel:
tag_fqn = fqn.build(
metadata=None,
entity_type=Tag,
classification_name=PII,
tag_name=tag.value,
)
tag_label = TagLabel(
tagFQN=tag_fqn,
source=TagSource.Classification,
@ -107,85 +69,27 @@ class PIIProcessor(Processor):
labelType=LabelType.Generated,
)
return ColumnTag(column_fqn=column_fqn, tag_label=tag_label)
return tag_label
def process_column(
self,
idx: int,
column: Column,
table_data: Optional[TableData],
confidence_threshold: float,
) -> Optional[List[ColumnTag]]:
def create_column_tag_labels(
self, column: Column, sample_data: Sequence[Any]
) -> Sequence[TagLabel]:
"""
Tag a column with PII if we find it using our scanners
Create tags for the column based on the sample data.
"""
# If the column we are about to process already has PII tags return empty
for tag in column.tags or []:
if PII in tag.tagFQN.root:
return []
# First, check if the column we are about to process
# already has PII tags or not
column_has_pii_tag = any((PII in tag.tagFQN.root for tag in column.tags or []))
# Get the tags and confidence
scores = self._classifier.predict_scores(
sample_data, column_name=column.name.root, column_data_type=column.dataType
)
# If it has PII tags, we skip the processing
# for the column
if column_has_pii_tag is True:
return None
scores = normalize_scores(scores, tol=self._tolerance)
# We'll scan first by sample data to prioritize the NER scanner
# If we find nothing, we'll check the column name
tag_and_confidence = (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None
) or self.name_scanner.scan(column.name.root)
if (
tag_and_confidence
and tag_and_confidence.tag_fqn
and tag_and_confidence.confidence >= confidence_threshold / 100
):
# We support returning +1 tags for a single column in _run
return [
self.build_column_tag(
tag_fqn=tag_and_confidence.tag_fqn,
column_fqn=column.fullyQualifiedName.root,
)
]
return None
def _run(
self,
record: SamplerResponse,
) -> Either[SamplerResponse]:
"""
Main entrypoint for the scanner.
Adds PII tagging based on the column names
and TableData
"""
# We don't always need to process
if not self.source_config.enableAutoClassification:
return Either(right=record)
column_tags = []
for idx, column in enumerate(record.table.columns):
try:
col_tags = self.process_column(
idx=idx,
column=column,
table_data=record.sample_data.data,
confidence_threshold=self.confidence_threshold,
)
if col_tags:
column_tags.extend(col_tags)
except Exception as err:
self.status.failed(
StackTraceError(
name=record.table.fullyQualifiedName.root,
error=f"Error computing PII tags for [{column}] - [{err}]",
stackTrace=traceback.format_exc(),
)
)
record.column_tags = column_tags
return Either(right=record)
# winner is at most 1 tag
winner = get_top_classes(scores, 1, self.confidence_threshold)
tag_labels = [self.build_tag_label(tag) for tag in winner]
return tag_labels

View File

@ -307,7 +307,6 @@ class PiiProcessorTest(TestCase):
)
updated_record: ProfilerResponse = self.pii_processor.run(record)
for expected, updated in zip(EXPECTED_COLUMN_TAGS, updated_record.column_tags):
self.assertEqual(expected.column_fqn, updated.column_fqn)
self.assertEqual(expected.tag_label.tagFQN, updated.tag_label.tagFQN)

View File

@ -0,0 +1,35 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource
from metadata.pii.algorithms.tags import PIISensitivityTag
from metadata.pii.processor import PIIProcessor
def test_pii_processor_build_tag_label_for_pii_sensitive():
tag = PIISensitivityTag.SENSITIVE
tag_label = PIIProcessor.build_tag_label(tag)
assert tag_label.tagFQN.root == "PII.Sensitive"
assert tag_label.source == TagSource.Classification
assert tag_label.state == State.Suggested
assert tag_label.labelType == LabelType.Generated
def test_pii_processor_build_tag_label_for_pii_nonsensitive():
tag = PIISensitivityTag.NONSENSITIVE
tag_label = PIIProcessor.build_tag_label(tag)
assert tag_label.tagFQN.root == "PII.NonSensitive"
assert tag_label.source == TagSource.Classification
assert tag_label.state == State.Suggested
assert tag_label.labelType == LabelType.Generated