From 5d2dfa712ae24356265aabe6319daa7d33c86e5c Mon Sep 17 00:00:00 2001
From: Pere Menal-Ferrer
Date: Mon, 19 May 2025 19:52:17 +0200
Subject: [PATCH] feature/pii-processor-improvement (#21248)
* Add PII Tag and Sensitivity Level enums.
* Add feature-extraction for PII classification tasks
* Add faker as test dependency
* Add unit tests for presidio tag extractor
* Add PIISensitivityTags enum and update sensitivity mapping logic
* Add Presidio utility functions for PII analysis
* Extend column name regexs for PII
* Add tests for PAN, NIF, SSN entities
* Fix version of faker to prevent flaky tests. Fix failing tests.
* Add Generated to State enum
* Integrate PIISensitive classifier to PIIProcessor
---
.pre-commit-config.yaml | 2 +-
ingestion/pyproject.toml | 2 +-
.../metadata/pii/algorithms/classifiers.py | 7 +
.../metadata/pii/algorithms/preprocessing.py | 4 +-
.../src/metadata/pii/algorithms/utils.py | 38 ++++
ingestion/src/metadata/pii/base_processor.py | 125 +++++++++++++
ingestion/src/metadata/pii/processor.py | 176 ++++--------------
.../orm_profiler/test_pii_processor.py | 1 -
.../tests/unit/pii/test_pii_sensitive.py | 35 ++++
9 files changed, 250 insertions(+), 140 deletions(-)
create mode 100644 ingestion/src/metadata/pii/algorithms/utils.py
create mode 100644 ingestion/src/metadata/pii/base_processor.py
create mode 100644 ingestion/tests/unit/pii/test_pii_sensitive.py
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 9b68634eb0d..3bc4bcc9b35 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -7,7 +7,7 @@ repos:
- id: check-json
exclude: vscode
- repo: https://github.com/hadialqattan/pycln
- rev: v2.4.0
+ rev: v2.5.0
hooks:
- id: pycln
files: ^(ingestion|openmetadata-airflow-apis)/
diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml
index d3db292e3af..a6424f1c53b 100644
--- a/ingestion/pyproject.toml
+++ b/ingestion/pyproject.toml
@@ -260,7 +260,7 @@ ignore = [
"src/metadata/mixins/*",
"src/metadata/parsers/*",
"src/metadata/pii/scanners/*",
- "src/metadata/pii/processor.py",
+ "src/metadata/pii/*processor.py",
"src/metadata/profiler/*",
"src/metadata/sampler/*",
"src/metadata/readers/*",
diff --git a/ingestion/src/metadata/pii/algorithms/classifiers.py b/ingestion/src/metadata/pii/algorithms/classifiers.py
index 483297dfb82..35de6fee3ae 100644
--- a/ingestion/src/metadata/pii/algorithms/classifiers.py
+++ b/ingestion/src/metadata/pii/algorithms/classifiers.py
@@ -164,10 +164,17 @@ class PIISensitiveClassifier(ColumnClassifier[PIISensitivityTag]):
sample_data, column_name, column_data_type
)
results: DefaultDict[PIISensitivityTag, float] = defaultdict(float)
+ counts: DefaultDict[PIISensitivityTag, int] = defaultdict(int)
for tag, score in pii_tags.items():
# Convert PIITag to PIISensitivityTag
pii_sensitivity = tag.sensitivity()
results[pii_sensitivity] += score
+ counts[pii_sensitivity] += 1
+
+ # Normalize the scores
+ for tag in results:
+ if counts[tag] > 0:
+ results[tag] /= counts[tag]
return results
diff --git a/ingestion/src/metadata/pii/algorithms/preprocessing.py b/ingestion/src/metadata/pii/algorithms/preprocessing.py
index ab1938dee87..8f7080193bd 100644
--- a/ingestion/src/metadata/pii/algorithms/preprocessing.py
+++ b/ingestion/src/metadata/pii/algorithms/preprocessing.py
@@ -11,6 +11,7 @@
"""
Preprocessing functions for the classification tasks.
"""
+import datetime
import json
from typing import Any, List, Mapping, Optional, Sequence
@@ -27,7 +28,8 @@ def convert_to_str(value: Any) -> Optional[str]:
"""
if isinstance(value, str):
return value
- if isinstance(value, (int, float)):
+ if isinstance(value, (int, float, datetime.datetime, datetime.date)):
+ # Values we want to convert to string out of the box
return str(value)
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
diff --git a/ingestion/src/metadata/pii/algorithms/utils.py b/ingestion/src/metadata/pii/algorithms/utils.py
new file mode 100644
index 00000000000..cf482336c9d
--- /dev/null
+++ b/ingestion/src/metadata/pii/algorithms/utils.py
@@ -0,0 +1,38 @@
+# Copyright 2025 Collate
+# Licensed under the Collate Community License, Version 1.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Utility functions for PII algorithms
+"""
+from typing import Mapping, Sequence, TypeVar
+
+T = TypeVar("T")
+
+
+def normalize_scores(scores: Mapping[T, float], tol: float = 0.01) -> Mapping[T, float]:
+ """
+ Normalize the scores to sum to 1, while ignoring scores below the tolerance.
+ Scores must be positive.
+ """
+ scores = {key: score for key, score in scores.items() if score > tol}
+ total = sum(scores.values())
+ if total == 0:
+ return scores
+ return {key: score / total for key, score in scores.items()}
+
+
+def get_top_classes(scores: Mapping[T, float], n: int, threshold: float) -> Sequence[T]:
+ """
+ Get the top n scores from the scores mapping that are above the threshold.
+ The classes are sorted in descending order of their scores.
+ """
+ sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True)
+ top_classes = [key for key, score in sorted_scores if score >= threshold]
+ return top_classes[:n]
diff --git a/ingestion/src/metadata/pii/base_processor.py b/ingestion/src/metadata/pii/base_processor.py
new file mode 100644
index 00000000000..0d94178b0e2
--- /dev/null
+++ b/ingestion/src/metadata/pii/base_processor.py
@@ -0,0 +1,125 @@
+# Copyright 2025 Collate
+# Licensed under the Collate Community License, Version 1.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Base class for the Auto Classification Processor.
+"""
+import traceback
+from abc import ABC, abstractmethod
+from typing import Any, Optional, Sequence, Type, TypeVar, cast, final
+
+from metadata.generated.schema.entity.data.table import Column
+from metadata.generated.schema.entity.services.ingestionPipelines.status import (
+ StackTraceError,
+)
+from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import (
+ DatabaseServiceAutoClassificationPipeline,
+)
+from metadata.generated.schema.metadataIngestion.workflow import (
+ OpenMetadataWorkflowConfig,
+)
+from metadata.generated.schema.type.tagLabel import TagLabel
+from metadata.ingestion.api.models import Either
+from metadata.ingestion.api.parser import parse_workflow_config_gracefully
+from metadata.ingestion.api.steps import Processor
+from metadata.ingestion.models.table_metadata import ColumnTag
+from metadata.ingestion.ometa.ometa_api import OpenMetadata
+from metadata.sampler.models import SamplerResponse
+
+C = TypeVar("C", bound="AutoClassificationProcessor")
+
+
+class AutoClassificationProcessor(Processor, ABC):
+ """
+ Abstract class for the Auto Classification Processor.
+
+ Implementations should only provide the logic for creating tags based on sample data,
+ and rely on the running part to be handled by the base class.
+ """
+
+ # Some methods are marked as final to prevent overriding in subclasses thus
+ # ensuring that the workflow is always run in the same way keeping implementer
+ # with the responsibility of *only* implementing the logic for creating tags.
+ def __init__(
+ self,
+ config: OpenMetadataWorkflowConfig,
+ metadata: OpenMetadata,
+ ):
+ super().__init__()
+ self.config = config
+ self.metadata = metadata
+
+ # Init and type the source config
+ self.source_config: DatabaseServiceAutoClassificationPipeline = cast(
+ DatabaseServiceAutoClassificationPipeline,
+ self.config.source.sourceConfig.config,
+ ) # Used to satisfy type checked
+
+ @abstractmethod
+ def create_column_tag_labels(
+ self, column: Column, sample_data: Sequence[Any]
+ ) -> Sequence[TagLabel]:
+ """
+ Create tags for the column based on the sample data.
+ """
+
+ @property
+ def name(self) -> str:
+ return "Auto Classification Processor"
+
+ def close(self) -> None:
+ """Nothing to close"""
+
+ @classmethod
+ @final
+ def create(
+ cls: Type[C],
+ config_dict: dict,
+ metadata: OpenMetadata,
+ pipeline_name: Optional[str] = None,
+ ) -> C:
+ config = parse_workflow_config_gracefully(config_dict)
+ return cls(config=config, metadata=metadata)
+
+ @final
+ def _run(self, record: SamplerResponse) -> Either[SamplerResponse]:
+ """
+ Main entrypoint for the processor.
+ """
+
+ # We don't always need to process
+ if not self.source_config.enableAutoClassification:
+ return Either(right=record, left=None)
+
+ column_tags = []
+
+ for idx, column in enumerate(record.table.columns):
+ try:
+ tags = self.create_column_tag_labels(
+ column=column,
+ sample_data=[row[idx] for row in record.sample_data.data.rows],
+ )
+ for tag in tags:
+ column_tag = ColumnTag(
+ column_fqn=column.fullyQualifiedName.root, tag_label=tag
+ )
+ column_tags.append(column_tag)
+ except Exception as err:
+ # TODO: Shouldn't we return a Left here?
+ self.status.failed(
+ StackTraceError(
+ name=record.table.fullyQualifiedName.root,
+ error=f"Error in Processor {self.name} computing tags for [{column}] - [{err}]",
+ stackTrace=traceback.format_exc(),
+ )
+ )
+
+ record.column_tags = column_tags
+ return Either(right=record, left=None)
diff --git a/ingestion/src/metadata/pii/processor.py b/ingestion/src/metadata/pii/processor.py
index 3cc396496ec..bd7b3d8a5cd 100644
--- a/ingestion/src/metadata/pii/processor.py
+++ b/ingestion/src/metadata/pii/processor.py
@@ -12,16 +12,10 @@
"""
Processor util to fetch pii sensitive columns
"""
-import traceback
-from typing import List, Optional, cast
+from typing import Any, Sequence
-from metadata.generated.schema.entity.data.table import Column, TableData
-from metadata.generated.schema.entity.services.ingestionPipelines.status import (
- StackTraceError,
-)
-from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import (
- DatabaseServiceAutoClassificationPipeline,
-)
+from metadata.generated.schema.entity.classification.tag import Tag
+from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
@@ -31,24 +25,21 @@ from metadata.generated.schema.type.tagLabel import (
TagLabel,
TagSource,
)
-from metadata.ingestion.api.models import Either
-from metadata.ingestion.api.parser import parse_workflow_config_gracefully
-from metadata.ingestion.api.step import Step
-from metadata.ingestion.api.steps import Processor
-from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.ometa.ometa_api import OpenMetadata
+from metadata.pii.algorithms.classifiers import ColumnClassifier, PIISensitiveClassifier
+from metadata.pii.algorithms.tags import PIISensitivityTag
+from metadata.pii.algorithms.utils import get_top_classes, normalize_scores
+from metadata.pii.base_processor import AutoClassificationProcessor
from metadata.pii.constants import PII
-from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
-from metadata.pii.scanners.ner_scanner import NERScanner
-from metadata.sampler.models import SamplerResponse
+from metadata.utils import fqn
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
-class PIIProcessor(Processor):
+class PIIProcessor(AutoClassificationProcessor):
"""
- A scanner that uses Spacy NER for entity recognition
+ An AutoClassificationProcessor that uses a PIISensitive classifier to tag columns.
"""
def __init__(
@@ -56,50 +47,21 @@ class PIIProcessor(Processor):
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
):
- super().__init__()
- self.config = config
- self.metadata = metadata
+ super().__init__(config, metadata)
+ self._classifier: ColumnClassifier[PIISensitivityTag] = PIISensitiveClassifier()
- # Init and type the source config
- self.source_config: DatabaseServiceAutoClassificationPipeline = cast(
- DatabaseServiceAutoClassificationPipeline,
- self.config.source.sourceConfig.config,
- ) # Used to satisfy type checked
-
- self._ner_scanner = None
- self.name_scanner = ColumnNameScanner()
- self.confidence_threshold = self.source_config.confidence
-
- @property
- def name(self) -> str:
- return "Auto Classification Processor"
-
- @property
- def ner_scanner(self) -> NERScanner:
- """Load the NER Scanner only if called"""
- if self._ner_scanner is None:
- self._ner_scanner = NERScanner()
-
- return self._ner_scanner
-
- @classmethod
- def create(
- cls,
- config_dict: dict,
- metadata: OpenMetadata,
- pipeline_name: Optional[str] = None,
- ) -> "Step":
- config = parse_workflow_config_gracefully(config_dict)
- return cls(config=config, metadata=metadata)
-
- def close(self) -> None:
- """Nothing to close"""
+ self.confidence_threshold = self.source_config.confidence / 100
+ self._tolerance = 0.01
@staticmethod
- def build_column_tag(tag_fqn: str, column_fqn: str) -> ColumnTag:
- """
- Build the tag and run the PATCH
- """
+ def build_tag_label(tag: PIISensitivityTag) -> TagLabel:
+ tag_fqn = fqn.build(
+ metadata=None,
+ entity_type=Tag,
+ classification_name=PII,
+ tag_name=tag.value,
+ )
+
tag_label = TagLabel(
tagFQN=tag_fqn,
source=TagSource.Classification,
@@ -107,85 +69,27 @@ class PIIProcessor(Processor):
labelType=LabelType.Generated,
)
- return ColumnTag(column_fqn=column_fqn, tag_label=tag_label)
+ return tag_label
- def process_column(
- self,
- idx: int,
- column: Column,
- table_data: Optional[TableData],
- confidence_threshold: float,
- ) -> Optional[List[ColumnTag]]:
+ def create_column_tag_labels(
+ self, column: Column, sample_data: Sequence[Any]
+ ) -> Sequence[TagLabel]:
"""
- Tag a column with PII if we find it using our scanners
+ Create tags for the column based on the sample data.
"""
+ # If the column we are about to process already has PII tags return empty
+ for tag in column.tags or []:
+ if PII in tag.tagFQN.root:
+ return []
- # First, check if the column we are about to process
- # already has PII tags or not
- column_has_pii_tag = any((PII in tag.tagFQN.root for tag in column.tags or []))
+ # Get the tags and confidence
+ scores = self._classifier.predict_scores(
+ sample_data, column_name=column.name.root, column_data_type=column.dataType
+ )
- # If it has PII tags, we skip the processing
- # for the column
- if column_has_pii_tag is True:
- return None
+ scores = normalize_scores(scores, tol=self._tolerance)
- # We'll scan first by sample data to prioritize the NER scanner
- # If we find nothing, we'll check the column name
- tag_and_confidence = (
- self.ner_scanner.scan([row[idx] for row in table_data.rows])
- if table_data
- else None
- ) or self.name_scanner.scan(column.name.root)
-
- if (
- tag_and_confidence
- and tag_and_confidence.tag_fqn
- and tag_and_confidence.confidence >= confidence_threshold / 100
- ):
- # We support returning +1 tags for a single column in _run
- return [
- self.build_column_tag(
- tag_fqn=tag_and_confidence.tag_fqn,
- column_fqn=column.fullyQualifiedName.root,
- )
- ]
-
- return None
-
- def _run(
- self,
- record: SamplerResponse,
- ) -> Either[SamplerResponse]:
- """
- Main entrypoint for the scanner.
-
- Adds PII tagging based on the column names
- and TableData
- """
-
- # We don't always need to process
- if not self.source_config.enableAutoClassification:
- return Either(right=record)
-
- column_tags = []
- for idx, column in enumerate(record.table.columns):
- try:
- col_tags = self.process_column(
- idx=idx,
- column=column,
- table_data=record.sample_data.data,
- confidence_threshold=self.confidence_threshold,
- )
- if col_tags:
- column_tags.extend(col_tags)
- except Exception as err:
- self.status.failed(
- StackTraceError(
- name=record.table.fullyQualifiedName.root,
- error=f"Error computing PII tags for [{column}] - [{err}]",
- stackTrace=traceback.format_exc(),
- )
- )
-
- record.column_tags = column_tags
- return Either(right=record)
+ # winner is at most 1 tag
+ winner = get_top_classes(scores, 1, self.confidence_threshold)
+ tag_labels = [self.build_tag_label(tag) for tag in winner]
+ return tag_labels
diff --git a/ingestion/tests/integration/orm_profiler/test_pii_processor.py b/ingestion/tests/integration/orm_profiler/test_pii_processor.py
index 1c480766855..76c03c37f01 100644
--- a/ingestion/tests/integration/orm_profiler/test_pii_processor.py
+++ b/ingestion/tests/integration/orm_profiler/test_pii_processor.py
@@ -307,7 +307,6 @@ class PiiProcessorTest(TestCase):
)
updated_record: ProfilerResponse = self.pii_processor.run(record)
-
for expected, updated in zip(EXPECTED_COLUMN_TAGS, updated_record.column_tags):
self.assertEqual(expected.column_fqn, updated.column_fqn)
self.assertEqual(expected.tag_label.tagFQN, updated.tag_label.tagFQN)
diff --git a/ingestion/tests/unit/pii/test_pii_sensitive.py b/ingestion/tests/unit/pii/test_pii_sensitive.py
new file mode 100644
index 00000000000..dd532f4e93b
--- /dev/null
+++ b/ingestion/tests/unit/pii/test_pii_sensitive.py
@@ -0,0 +1,35 @@
+# Copyright 2025 Collate
+# Licensed under the Collate Community License, Version 1.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource
+from metadata.pii.algorithms.tags import PIISensitivityTag
+from metadata.pii.processor import PIIProcessor
+
+
+def test_pii_processor_build_tag_label_for_pii_sensitive():
+
+ tag = PIISensitivityTag.SENSITIVE
+ tag_label = PIIProcessor.build_tag_label(tag)
+
+ assert tag_label.tagFQN.root == "PII.Sensitive"
+ assert tag_label.source == TagSource.Classification
+ assert tag_label.state == State.Suggested
+ assert tag_label.labelType == LabelType.Generated
+
+
+def test_pii_processor_build_tag_label_for_pii_nonsensitive():
+ tag = PIISensitivityTag.NONSENSITIVE
+ tag_label = PIIProcessor.build_tag_label(tag)
+
+ assert tag_label.tagFQN.root == "PII.NonSensitive"
+ assert tag_label.source == TagSource.Classification
+ assert tag_label.state == State.Suggested
+ assert tag_label.labelType == LabelType.Generated