* Add PIICategoryTags and some utilities on top of them.

* Fix static-check

* Add test for fqn representation

* Add NEREntityGeneralTags.json from Collate

* Add test to check PIICategoryTags agree with the ones used by OM server

* Add LabelExtractor

* Fix style

* Add ignore superflous-parens for pylint

* Ass comment as per PR review

* Fix not-updated PII-IT

* Remove duplicated IT test for PII

---------

Co-authored-by: Pere Menal <pere.menal@getcollate.io>
Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
Pere Menal-Ferrer 2025-06-10 01:05:35 +02:00 committed by GitHub
parent cb57656ae9
commit 1e86f9870f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 730 additions and 812 deletions

View File

@ -259,7 +259,6 @@ ignore = [
"src/metadata/ingestion/*",
"src/metadata/mixins/*",
"src/metadata/parsers/*",
"src/metadata/pii/scanners/*",
"src/metadata/pii/*processor.py",
"src/metadata/profiler/*",
"src/metadata/sampler/*",

View File

@ -12,10 +12,8 @@
Classifier for PII detection and sensitivity tagging.
"""
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import (
Any,
DefaultDict,
Dict,
Generic,
Hashable,
@ -47,12 +45,12 @@ from metadata.pii.algorithms.presidio_utils import (
build_analyzer_engine,
set_presidio_logger_level,
)
from metadata.pii.algorithms.tags import PIISensitivityTag, PIITag
from metadata.pii.algorithms.tags import PIITag
T = TypeVar("T", bound=Hashable)
class ColumnClassifier(ABC, Generic[T]):
class ColumnLabeler(ABC, Generic[T]):
"""
Base class for column classifiers.
This class defines the interface for classifiers that predict the class
@ -77,7 +75,7 @@ class ColumnClassifier(ABC, Generic[T]):
@final
class HeuristicPIIClassifier(ColumnClassifier[PIITag]):
class HeuristicPIILabeler(ColumnLabeler[PIITag]):
"""
Heuristic PII Column Classifier
"""
@ -140,45 +138,12 @@ class HeuristicPIIClassifier(ColumnClassifier[PIITag]):
if tag in column_name_matches:
final_score += self._column_name_contribution
# Apply the score cutoff
if final_score >= self._score_cutoff:
final_results[tag] = final_score
if final_score < self._score_cutoff:
continue
final_results[tag] = final_score
# Make sure all scores are capped at 1.0
for tag in final_results:
final_results[tag] = min(final_results[tag], 1.0)
return final_results
class PIISensitiveClassifier(ColumnClassifier[PIISensitivityTag]):
"""
Implements a classifier for PII sensitivity tags based on a given
PII column classifier. If no classifier is provided, it defaults to
using the HeuristicPIIColumnClassifier.
"""
def __init__(self, classifier: Optional[ColumnClassifier[PIITag]] = None):
self.classifier: ColumnClassifier[PIITag] = (
classifier or HeuristicPIIClassifier()
)
def predict_scores(
self,
sample_data: Sequence[Any],
column_name: Optional[str] = None,
column_data_type: Optional[DataType] = None,
) -> Mapping[PIISensitivityTag, float]:
pii_tags = self.classifier.predict_scores(
sample_data, column_name, column_data_type
)
results: DefaultDict[PIISensitivityTag, float] = defaultdict(float)
counts: DefaultDict[PIISensitivityTag, int] = defaultdict(int)
for tag, score in pii_tags.items():
# Convert PIITag to PIISensitivityTag
pii_sensitivity = tag.sensitivity()
results[pii_sensitivity] += score
counts[pii_sensitivity] += 1
# Normalize the scores
for tag in results:
if counts[tag] > 0:
results[tag] /= counts[tag]
return results

View File

@ -22,7 +22,7 @@ from presidio_analyzer import AnalyzerEngine
from metadata.generated.schema.entity.data.table import DataType
from metadata.pii.algorithms.presidio_patches import PresidioRecognizerResultPatcher
from metadata.pii.algorithms.tags import PIITag
from metadata.pii.scanners.ner_scanner import SUPPORTED_LANG
from metadata.pii.constants import SUPPORTED_LANG
from metadata.utils.logger import pii_logger
logger = pii_logger()

View File

@ -0,0 +1,110 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Label extractor interface and implementations.
"""
from abc import ABC, abstractmethod
from typing import Generic, Mapping, Set, TypeVar, final
from metadata.pii.algorithms.scoring_ops import scores_cleanup, scores_to_probabilities
T = TypeVar("T")
class LabelExtractor(ABC, Generic[T]):
"""
Protocol for extracting labels from a mapping of label scores.
This goal is to abstract the logic of how labels are extracted
from the scores, allowing different strategies to be implemented
depending on the underlying algorithm or use-case.
"""
@abstractmethod
def extract_labels(self, scores: Mapping[T, float]) -> Set[T]:
"""
Extract labels from the given scores mapping.
Args:
scores (Mapping[T, float]): A mapping from labels to scores or probabilities.
Returns:
Set[T]: A set of labels extracted from the scores.
"""
@final
class ProbabilisticLabelExtractor(LabelExtractor[T], Generic[T]):
"""
Extracts the most probable label(s) from a set of raw class scores using score filtering
and probability normalization.
This extractor treats the input scores as representing a multiclass classification scenario,
where only one or a few mutually exclusive labels are expected to be true. It filters out
low-confidence scores, normalizes the remaining ones into a probability distribution, and
returns the top-k labels that meet a minimum probability threshold.
After normalization, scores are interpreted as probabilitiesthat is, each label's
value represents its relative likelihood among the remaining candidates.
Args:
k (int): The number of top labels to consider based on normalized probability.
score_threshold (float): Minimum raw score required to keep a label before normalization.
prob_threshold (float): Minimum normalized probability required for a label to be returned.
Returns:
Set[T]: A set of labels that pass both score and probability thresholds.
Notes:
- If only one label remains after score filtering, it will have a probability of 1.0
and will always be returned if `k >= 1`.
- When multiple labels remain, their probabilities may be lower, and some or all
may fall below the `prob_threshold`.
- This approach implicitly encodes a confidence mechanism: a label must be
both strong enough in raw score and relatively dominant in probability to be selected.
"""
def __init__(
self,
k: int,
score_threshold: float,
prob_threshold: float,
) -> None:
if not (0 <= score_threshold <= 1): # pylint: disable=superfluous-parens
raise ValueError("score_threshold must be between 0 and 1")
if not (0 <= prob_threshold <= 1): # pylint: disable=superfluous-parens
raise ValueError("prob_threshold must be between 0 and 1")
if k < 1:
raise ValueError("k must be at least 1")
self._score_threshold = score_threshold
self._prob_threshold = prob_threshold
self._k = k
def extract_labels(self, scores: Mapping[T, float]) -> Set[T]:
"""
Applies filtering and probability-based selection to extract high-confidence labels.
"""
filtered_scores = scores_cleanup(
scores, min_score=self._score_threshold, max_score=1.0
)
probabilities = scores_to_probabilities(filtered_scores)
if probabilities is None:
return set()
top_k = sorted(probabilities.items(), key=lambda item: item[1], reverse=True)[
: self._k
]
return {label for label, prob in top_k if prob >= self._prob_threshold}

View File

@ -0,0 +1,74 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions for PII algorithms
"""
from collections import defaultdict
from typing import Callable, DefaultDict, Mapping, Optional, TypeVar
T = TypeVar("T")
S = TypeVar("S")
# Scores transformations
def scores_cleanup(
scores: Mapping[T, float], min_score: float = 0.01, max_score: float = 1.0
) -> Mapping[T, float]:
"""
Clean the scores mapping by removing keys with scores below the minimum score.
Scores above the maximum score are capped to the maximum score.
"""
if min_score > max_score:
raise ValueError(
f"Minimum score {min_score} cannot be greater than maximum score {max_score}."
)
return {
key: min(score, max_score)
for key, score in scores.items()
if score >= min_score
}
def scores_group_by(
scores: Mapping[T, float], key_fn: Callable[[T], S]
) -> Mapping[S, float]:
"""
Group the scores by a key function.
The key function is applied to each key in `scores`,
and the scores are averaged for each group, thus maintaining
the score within the same range as the original one.
"""
grouped: DefaultDict[S, float] = defaultdict(float)
counts: DefaultDict[S, int] = defaultdict(int)
# First, we count the occurrences of each key
for key, score in scores.items():
grouped[key_fn(key)] += score
counts[key_fn(key)] += 1
# Then, we average the scores by dividing by the count
for key in grouped:
grouped[key] /= counts[key]
return grouped
def scores_to_probabilities(
scores: Mapping[T, float], tolerance: float = 0.001
) -> Optional[Mapping[T, float]]:
total = sum(scores.values())
if total < tolerance:
return None
return {key: score / total for key, score in scores.items()}

View File

@ -16,15 +16,68 @@ import enum
from typing import List
class PIIClassificationName(enum.Enum):
"""
Classification name for PII related tags:
- PII: means is PIISensitive or PIINonSensitive.
- General: means PII Category (e.g., PERSON, EMAIL, etc.).
"""
PII = "PII"
GENERAL = "General"
class PIISensitivityTag(enum.Enum):
SENSITIVE = "Sensitive"
NONSENSITIVE = "NonSensitive"
@classmethod
def pii_classification_name(cls) -> PIIClassificationName:
return PIIClassificationName.PII
@enum.unique
class PIICategoryTag(enum.Enum):
"""
PII Category Tags.
These tags are used to categorize the PII tags into broader categories,
for instance, to show the PII tags in the UI.
"""
PASSWORD = "Password"
BANK_NUMBER = "BankNumber"
PERSON = "Person"
BIRTH_DATE = "BirthDate"
GENDER = "Gender"
NRP = "NRP"
ADDRESS = "Address"
CREDIT_CARD = "CreditCardNumber"
CRYPTO = "Crypto"
DATE_TIME = "DateTime"
EMAIL_ADDRESS = "Email"
IBAN_CODE = "IBANCode"
IP_ADDRESS = "IPAddress"
LOCATION = "Location"
PHONE_NUMBER = "PhoneNumber"
MEDICAL_LICENSE = "MedicalLicense"
URL = "URL"
DRIVER_LICENSE = "DriverLicense"
NATIONAL_ID = "NationalID"
PASSPORT = "Passport"
VAT_CODE = "VATCode"
@classmethod
def pii_classification_name(cls) -> PIIClassificationName:
return PIIClassificationName.GENERAL
@enum.unique
class PIITag(enum.Enum):
"""
PII Tags (borrowed from Presidio https://microsoft.github.io/presidio/supported_entities/).
The values of these tags are valid Presidio entity names, changing them
will break the integration with Presidio.
A better name for this enum would have been `PresidioPII`.
"""
# Global
@ -91,21 +144,3 @@ class PIITag(enum.Enum):
Get all the values of the enum as a set of strings.
"""
return [tag.value for tag in cls]
def sensitivity(self) -> PIISensitivityTag:
"""
Get the sensitivity level of the PII tag.
This map is opinionated and can be changed in the future according to users' needs.
"""
if self in DEFAULT_NON_PII_SENSITIVE:
return PIISensitivityTag.NONSENSITIVE
return PIISensitivityTag.SENSITIVE
DEFAULT_NON_PII_SENSITIVE = (
PIITag.DATE_TIME,
PIITag.NRP,
PIITag.LOCATION,
PIITag.PHONE_NUMBER,
PIITag.URL,
)

View File

@ -0,0 +1,137 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility operation on top of the tags defined in PII algorithms.
"""
from typing import Collection, Dict, Optional, Set
from metadata.pii.algorithms.tags import PIICategoryTag, PIISensitivityTag, PIITag
def categorize_pii_tag(pii_tag: PIITag) -> PIICategoryTag:
"""
Categorize the PII tag into a broader category.
"""
# return the category tag if the PII tag is in the category map
# the category map is defined separately for better readability
for category, tags in _CATEGORY_MAP.items():
if pii_tag in tags:
return category
# This should never happen, as we should have unit tests to ensure all PII tags are categorized.
raise ValueError(f"PII tag does not belong to any category: {pii_tag}")
def get_sensitivity_for_pii_category(
pii_category_tag: PIICategoryTag,
) -> PIISensitivityTag:
"""
Get the sensitivity level of the PIICategoryTag.
This map is opinionated and can be changed in according to users' needs.
"""
non_pii_sensitive = (
PIICategoryTag.GENDER,
PIICategoryTag.NRP,
PIICategoryTag.DATE_TIME,
PIICategoryTag.LOCATION,
PIICategoryTag.PHONE_NUMBER,
PIICategoryTag.URL,
)
if pii_category_tag in non_pii_sensitive:
return PIISensitivityTag.NONSENSITIVE
return PIISensitivityTag.SENSITIVE
def resolve_sensitivity(
sensitivities: Collection[PIISensitivityTag],
) -> Optional[PIISensitivityTag]:
"""
Resolve the sensitivity level from a list of PIISensitivityTag.
Most restricted sensitivity is returned if multiple tags are present.
"""
if not sensitivities:
return None
if PIISensitivityTag.SENSITIVE in sensitivities:
return PIISensitivityTag.SENSITIVE
return PIISensitivityTag.NONSENSITIVE
def get_sensitivity_for_pii(pii_tag: PIITag) -> PIISensitivityTag:
"""
Get the sensitivity level of the PIITag.
This map is opinionated and can be changed in the future according to users' needs.
"""
pii_category_tag = categorize_pii_tag(pii_tag)
return get_sensitivity_for_pii_category(pii_category_tag)
# Parent child aliases
_P = PIICategoryTag
_C = PIITag
# Define the PIITag's a PIICategoryTag contains to map Presidio PII
# tags to PIICategoryTag.
_CATEGORY_MAP: Dict[PIICategoryTag, Set[PIITag]] = {
_P.PASSWORD: set(),
_P.BANK_NUMBER: {_C.US_BANK_NUMBER},
_P.CREDIT_CARD: {_C.CREDIT_CARD},
_P.PERSON: {_C.PERSON},
_P.GENDER: set(),
_P.NRP: {_C.NRP},
_P.ADDRESS: set(),
_P.CRYPTO: {_C.CRYPTO},
_P.DATE_TIME: {_C.DATE_TIME},
_P.EMAIL_ADDRESS: {_C.EMAIL_ADDRESS},
_P.IBAN_CODE: {_C.IBAN_CODE},
_P.IP_ADDRESS: {_C.IP_ADDRESS},
_P.LOCATION: {_C.LOCATION},
_P.PHONE_NUMBER: {_C.PHONE_NUMBER},
_P.MEDICAL_LICENSE: {_C.MEDICAL_LICENSE},
_P.URL: {_C.URL},
_P.DRIVER_LICENSE: {
_C.US_DRIVER_LICENSE,
_C.UK_NHS,
_C.IT_DRIVER_LICENSE,
},
_P.NATIONAL_ID: {
_C.US_ITIN,
_C.US_SSN,
_C.UK_NHS,
_C.ES_NIF,
_C.ES_NIE,
_C.IT_FISCAL_CODE,
_C.IT_PASSPORT,
_C.IT_IDENTITY_CARD,
_C.PL_PESEL,
_C.SG_NRIC_FIN,
_C.SG_UEN,
_C.AU_ABN,
_C.AU_ACN,
_C.AU_TFN,
_C.AU_MEDICARE,
_C.IN_PAN,
_C.IN_AADHAAR,
_C.IN_VEHICLE_REGISTRATION,
_C.IN_VOTER,
_C.FI_PERSONAL_IDENTITY_CODE,
},
_P.PASSPORT: {
_C.US_PASSPORT,
_C.IT_PASSPORT,
_C.IN_PASSPORT,
},
_P.VAT_CODE: {
_C.IT_VAT_CODE,
_C.AU_ABN,
_C.AU_ACN,
_C.AU_TFN,
},
}

View File

@ -1,38 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions for PII algorithms
"""
from typing import Mapping, Sequence, TypeVar
T = TypeVar("T")
def normalize_scores(scores: Mapping[T, float], tol: float = 0.01) -> Mapping[T, float]:
"""
Normalize the scores to sum to 1, while ignoring scores below the tolerance.
Scores must be positive.
"""
scores = {key: score for key, score in scores.items() if score > tol}
total = sum(scores.values())
if total == 0:
return scores
return {key: score / total for key, score in scores.items()}
def get_top_classes(scores: Mapping[T, float], n: int, threshold: float) -> Sequence[T]:
"""
Get the top n scores from the scores mapping that are above the threshold.
The classes are sorted in descending order of their scores.
"""
sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True)
top_classes = [key for key, score in sorted_scores if score >= threshold]
return top_classes[:n]

View File

@ -112,7 +112,6 @@ class AutoClassificationProcessor(Processor, ABC):
)
column_tags.append(column_tag)
except Exception as err:
# TODO: Shouldn't we return a Left here?
self.status.failed(
StackTraceError(
name=record.table.fullyQualifiedName.root,

View File

@ -12,8 +12,6 @@
PII constants
"""
PII = "PII"
# Constants for Presidio
PRESIDIO_LOGGER = "presidio-analyzer"
SPACY_EN_MODEL = "en_core_web_md"

View File

@ -1,26 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PII processing models
"""
from enum import Enum
from pydantic import BaseModel
class TagType(Enum):
SENSITIVE = "Sensitive"
NONSENSITIVE = "NonSensitive"
class TagAndConfidence(BaseModel):
tag_fqn: str
confidence: float

View File

@ -1,85 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NER Scanner based on Presidio.
Supported Entities https://microsoft.github.io/presidio/supported_entities/
"""
from enum import Enum
from metadata.pii.models import TagType
from metadata.utils.logger import pii_logger
logger = pii_logger()
class NEREntity(Enum):
"""
PII Entities supported by Presidio https://microsoft.github.io/presidio/supported_entities/
"""
# Global
CREDIT_CARD = TagType.SENSITIVE.value
CRYPTO = TagType.SENSITIVE.value
DATE_TIME = TagType.NONSENSITIVE.value
EMAIL_ADDRESS = TagType.SENSITIVE.value
IBAN_CODE = TagType.SENSITIVE.value
IP_ADDRESS = TagType.SENSITIVE.value
NRP = TagType.NONSENSITIVE.value
LOCATION = TagType.NONSENSITIVE.value
PERSON = TagType.SENSITIVE.value
PHONE_NUMBER = TagType.NONSENSITIVE.value
MEDICAL_LICENSE = TagType.SENSITIVE.value
URL = TagType.NONSENSITIVE.value
# USA
US_BANK_NUMBER = TagType.SENSITIVE.value
US_DRIVER_LICENSE = TagType.SENSITIVE.value
US_ITIN = TagType.SENSITIVE.value
US_PASSPORT = TagType.SENSITIVE.value
US_SSN = TagType.SENSITIVE.value
# UK
UK_NHS = TagType.SENSITIVE.value
# Spain
ES_NIF = TagType.SENSITIVE.value
ES_NIE = TagType.SENSITIVE.value
# Italy
IT_FISCAL_CODE = TagType.SENSITIVE.value
IT_DRIVER_LICENSE = TagType.SENSITIVE.value
IT_VAT_CODE = TagType.SENSITIVE.value
IT_PASSPORT = TagType.SENSITIVE.value
IT_IDENTITY_CARD = TagType.SENSITIVE.value
# Poland
PL_PESEL = TagType.SENSITIVE.value
# Singapore
SG_NRIC_FIN = TagType.SENSITIVE.value
SG_UEN = TagType.SENSITIVE.value
# Australia
AU_ABN = TagType.SENSITIVE.value
AU_ACN = TagType.SENSITIVE.value
AU_TFN = TagType.SENSITIVE.value
AU_MEDICARE = TagType.SENSITIVE.value
# India
IN_PAN = TagType.SENSITIVE.value
IN_AADHAAR = TagType.SENSITIVE.value
IN_VEHICLE_REGISTRATION = TagType.SENSITIVE.value
IN_VOTER = TagType.SENSITIVE.value
IN_PASSPORT = TagType.SENSITIVE.value
# Finland
FI_PERSONAL_IDENTITY_CODE = TagType.SENSITIVE.value

View File

@ -12,7 +12,7 @@
"""
Processor util to fetch pii sensitive columns
"""
from typing import Any, Sequence
from typing import Any, Sequence, TypeVar, Union
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.table import Column
@ -26,13 +26,28 @@ from metadata.generated.schema.type.tagLabel import (
TagSource,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.algorithms.tags import PIISensitivityTag
from metadata.pii.algorithms.utils import get_top_classes, normalize_scores
from metadata.pii.algorithms.label_extractors import (
LabelExtractor,
ProbabilisticLabelExtractor,
)
from metadata.pii.algorithms.scoring_ops import scores_group_by
from metadata.pii.algorithms.tags import (
PIICategoryTag,
PIIClassificationName,
PIISensitivityTag,
PIITag,
)
from metadata.pii.algorithms.tags_ops import (
categorize_pii_tag,
get_sensitivity_for_pii_category,
resolve_sensitivity,
)
from metadata.pii.base_processor import AutoClassificationProcessor
from metadata.pii.constants import PII
from metadata.utils import fqn
from metadata.utils.logger import profiler_logger
T = TypeVar("T")
logger = profiler_logger()
@ -48,33 +63,23 @@ class PIIProcessor(AutoClassificationProcessor):
):
super().__init__(config, metadata)
from metadata.pii.algorithms.classifiers import ( # pylint: disable=import-outside-toplevel
ColumnClassifier,
PIISensitiveClassifier,
from metadata.pii.algorithms.column_labelers import ( # pylint: disable=import-outside-toplevel
ColumnLabeler,
HeuristicPIILabeler,
)
self._classifier: ColumnClassifier[PIISensitivityTag] = PIISensitiveClassifier()
prob_threshold = self.source_config.confidence / 100
score_threshold = 0.1 # This is the minimum score to consider a tag
self.confidence_threshold = self.source_config.confidence / 100
self._tolerance = 0.01
@staticmethod
def build_tag_label(tag: PIISensitivityTag) -> TagLabel:
tag_fqn = fqn.build(
metadata=None,
entity_type=Tag,
classification_name=PII,
tag_name=tag.value,
self._pii_category_extractor: LabelExtractor[
PIICategoryTag
] = ProbabilisticLabelExtractor(
score_threshold=score_threshold,
prob_threshold=prob_threshold,
k=1, # k=1 means we return only the top category
)
tag_label = TagLabel(
tagFQN=tag_fqn,
source=TagSource.Classification,
state=State.Suggested,
labelType=LabelType.Generated,
)
return tag_label
self._pii_labeler: ColumnLabeler[PIITag] = HeuristicPIILabeler()
def create_column_tag_labels(
self, column: Column, sample_data: Sequence[Any]
@ -84,17 +89,55 @@ class PIIProcessor(AutoClassificationProcessor):
"""
# If the column we are about to process already has PII tags return empty
for tag in column.tags or []:
if PII in tag.tagFQN.root:
if PIIClassificationName.PII.value in tag.tagFQN.root:
return []
# Get the tags and confidence
scores = self._classifier.predict_scores(
pii_tag_scores = self._pii_labeler.predict_scores(
sample_data, column_name=column.name.root, column_data_type=column.dataType
)
scores = normalize_scores(scores, tol=self._tolerance)
pii_category_scores = scores_group_by(pii_tag_scores, categorize_pii_tag)
# We allow more than one category to be assigned, this might be useful
# for documents that contain multiple PII types.
# Whether, we want to return one or multiple labels is controlled
# by the LabelExtractor; to modify this behavior, please change the
# LabelExtractor used, and not the implementation of this method.
pii_categories = self._pii_category_extractor.extract_labels(
pii_category_scores
)
tag_labels = [get_tag_label(pii_category) for pii_category in pii_categories]
# Determine the sensitivity of the PII categories, if any
pii_sensitivity = resolve_sensitivity(
{get_sensitivity_for_pii_category(pc) for pc in pii_categories}
)
if pii_sensitivity:
tag_labels.append(get_tag_label(pii_sensitivity))
# winner is at most 1 tag
winner = get_top_classes(scores, 1, self.confidence_threshold)
tag_labels = [self.build_tag_label(tag) for tag in winner]
return tag_labels
def get_tag_label(tag: Union[PIICategoryTag, PIISensitivityTag]) -> TagLabel:
fqn_str = fqn.build(
None,
entity_type=Tag,
classification_name=tag.pii_classification_name().value,
tag_name=tag.value,
)
if fqn_str is None:
# This should be prevented by unit tests, but in case it happens,
# we prefer to fail noisily rather than silently returning None.
raise ValueError(f"Failed to build FQN for tag: {tag}")
return TagLabel(
tagFQN=fqn_str,
source=TagSource.Classification,
state=State.Suggested,
labelType=LabelType.Generated,
)

View File

@ -1,23 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Basic Scanner ABC
"""
from abc import ABC, abstractmethod
from typing import Any
class BaseScanner(ABC):
"""Basic scanner abstract class"""
@abstractmethod
def scan(self, data: Any):
"""Scan the given data from a column"""

View File

@ -1,84 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Regex scanner for column names
"""
import re
from typing import Optional
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.pii.constants import PII
from metadata.pii.models import TagAndConfidence, TagType
from metadata.pii.scanners.base import BaseScanner
from metadata.utils import fqn
class ColumnNameScanner(BaseScanner):
"""Column Name Scanner to scan column name"""
sensitive_regex = {
"PASSWORD": re.compile("^.*password.*$", re.IGNORECASE),
"US_SSN": re.compile("^.*(ssn|social).*$", re.IGNORECASE),
"CREDIT_CARD": re.compile("^.*(credit).*(card).*$", re.IGNORECASE),
"BANK_ACCOUNT": re.compile("^.*bank.*(acc|num).*$", re.IGNORECASE),
"EMAIL_ADDRESS": re.compile("^(email|e-mail|mail)(.*address)?$", re.IGNORECASE),
"USER_NAME": re.compile("^.*(user|client|person).*(name).*$", re.IGNORECASE),
"PERSON": re.compile(
"^.*(firstname|lastname|fullname|maidenname|nickname|name_suffix).*$",
re.IGNORECASE,
),
}
non_sensitive_regex = {
"BIRTH_DATE": re.compile(
"^.*(date_of_birth|dateofbirth|dob|"
"birthday|date_of_death|dateofdeath).*$",
re.IGNORECASE,
),
"GENDER": re.compile("^.*(gender).*$", re.IGNORECASE),
"NATIONALITY": re.compile("^.*(nationality).*$", re.IGNORECASE),
"ADDRESS": re.compile(
"^.*(address|city|state|county|country|"
"zipcode|zip|postal|zone|borough).*$",
re.IGNORECASE,
),
"PHONE_NUMBER": re.compile("^.*(phone).*$", re.IGNORECASE),
}
def scan(self, data: str) -> Optional[TagAndConfidence]:
"""
Check the column name against the regex patterns and prepare the
sensitive or non-sensitive tag
"""
for pii_type_pattern in self.sensitive_regex.values():
if pii_type_pattern.match(data) is not None:
return TagAndConfidence(
tag_fqn=fqn.build(
metadata=None,
entity_type=Tag,
classification_name=PII,
tag_name=TagType.SENSITIVE.value,
),
confidence=1,
)
for pii_type_pattern in self.non_sensitive_regex.values():
if pii_type_pattern.match(data) is not None:
return TagAndConfidence(
tag_fqn=fqn.build(
metadata=None,
entity_type=Tag,
classification_name=PII,
tag_name=TagType.NONSENSITIVE.value,
),
confidence=1,
)
return None

View File

@ -1,185 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NER Scanner based on Presidio.
Supported Entities https://microsoft.github.io/presidio/supported_entities/
"""
import json
import logging
import traceback
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import BaseModel, ConfigDict
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.pii.constants import PII, SPACY_EN_MODEL
from metadata.pii.models import TagAndConfidence
from metadata.pii.ner import NEREntity
from metadata.pii.scanners.base import BaseScanner
from metadata.utils import fqn
from metadata.utils.logger import METADATA_LOGGER, pii_logger
logger = pii_logger()
SUPPORTED_LANG = "en"
PRESIDIO_LOGGER = "presidio-analyzer"
class StringAnalysis(BaseModel):
"""
Used to store results from the sample data scans for each NER Entity
"""
score: float
appearances: int
class NLPEngineModel(BaseModel):
"""Required to pass the nlp_engine as {"lang_code": "en", "model_name": "en_core_web_lg"}"""
model_config = ConfigDict(protected_namespaces=())
lang_code: str
model_name: str
# pylint: disable=import-outside-toplevel
class NERScanner(BaseScanner):
"""Based on https://microsoft.github.io/presidio/"""
def __init__(self):
import spacy
from presidio_analyzer import AnalyzerEngine
from presidio_analyzer.nlp_engine.spacy_nlp_engine import SpacyNlpEngine
try:
spacy.load(SPACY_EN_MODEL)
except OSError:
logger.warning("Downloading en_core_web_md language model for the spaCy")
from spacy.cli import download
download(SPACY_EN_MODEL)
spacy.load(SPACY_EN_MODEL)
nlp_engine_model = NLPEngineModel(
lang_code=SUPPORTED_LANG, model_name=SPACY_EN_MODEL
)
# Set the presidio logger to talk less about internal entities unless we are debugging
logging.getLogger(PRESIDIO_LOGGER).setLevel(
logging.INFO
if logging.getLogger(METADATA_LOGGER).level == logging.DEBUG
else logging.ERROR
)
self.analyzer = AnalyzerEngine(
nlp_engine=SpacyNlpEngine(models=[nlp_engine_model.model_dump()])
)
@staticmethod
def get_highest_score_label(
entities_score: Dict[str, StringAnalysis]
) -> Tuple[str, float]:
top_entity = max(
entities_score,
key=lambda type_: entities_score[type_].score
* entities_score[type_].appearances
* 0.8,
)
return top_entity, entities_score[top_entity].score
def scan(self, data: List[Any]) -> Optional[TagAndConfidence]:
"""
Scan the column's sample data rows and look for PII.
How this works:
1. We create a list of strings [s1, s2, ..., sn] with each sample data row for a column
2. Then, for each s_i:
a. Run the analyzer, which will return a list of possible recognized Entities and confidence score
For example, the result of analyzing `123456789` gives us
[
type: DATE_TIME, start: 0, end: 9, score: 0.85,
type: US_BANK_NUMBER, start: 0, end: 9, score: 0.05,
type: US_PASSPORT, start: 0, end: 9, score: 0.05,
type: US_DRIVER_LICENSE, start: 0, end: 9, score: 0.01
]
b. Each time an `Entity` appears (e.g., DATE_TIME), we store its max score and the number of appearances
3. After gathering all the results for each row, get the `Entity` with maximum overall score
and number of appearances. This gets computed as "score * appearances * 0.8", which can
be thought as the "score" times "weighted down appearances".
4. Once we have the "top" `Entity` from that column, we assign the PII label accordingly from `NEREntity`.
"""
logger.debug("Processing '%s'", data)
# Initialize an empty dict for the given row list
entities_score: Dict[str, StringAnalysis] = defaultdict(
lambda: StringAnalysis(score=0, appearances=0)
)
str_sample_data_rows = [str(row) for row in data if row is not None]
for row in str_sample_data_rows:
try:
self.process_data(row=row, entities_score=entities_score)
except Exception as exc:
logger.warning(f"Unknown error while processing {row} - {exc}")
logger.debug(traceback.format_exc())
if entities_score:
label, score = self.get_highest_score_label(entities_score)
tag_type = NEREntity.__members__.get(label)
if not tag_type:
return None
return TagAndConfidence(
tag_fqn=fqn.build(
metadata=None,
entity_type=Tag,
classification_name=PII,
tag_name=tag_type.value,
),
confidence=score,
)
return None
def process_data(self, row: str, entities_score: Dict[str, StringAnalysis]) -> None:
"""Process the Sample Data rows, checking if they are of JSON format as well"""
# first, check if the data is JSON or we can work with strings
is_json, value = self.is_json_data(row)
if is_json and isinstance(value, dict):
for val in value.values():
self.process_data(row=str(val), entities_score=entities_score)
elif is_json and isinstance(value, list):
for val in value:
self.process_data(row=str(val), entities_score=entities_score)
else:
self.scan_value(value=row, entities_score=entities_score)
@staticmethod
def is_json_data(value: str) -> Tuple[bool, Union[dict, list, None]]:
"""Check if the value is a JSON object that we need to process differently than strings"""
try:
res = json.loads(value)
if isinstance(res, (dict, list)):
return True, res
return False, None
except json.JSONDecodeError:
return False, None
def scan_value(self, value: str, entities_score: Dict[str, StringAnalysis]):
"""Scan the value for PII"""
results = self.analyzer.analyze(value, language="en")
for result in results:
entities_score[result.entity_type] = StringAnalysis(
score=result.score
if result.score > entities_score[result.entity_type].score
else entities_score[result.entity_type].score,
appearances=entities_score[result.entity_type].appearances + 1,
)

View File

@ -0,0 +1,5 @@
from pathlib import Path
TESTS_ROOT_DIR = Path(__file__).parent
INGESTION_ROOT_DIR = TESTS_ROOT_DIR.parent
REPO_ROOT_DIR = INGESTION_ROOT_DIR.parent

View File

@ -113,6 +113,15 @@ table_data = TableData(
EXPECTED_COLUMN_TAGS = [
ColumnTag(
column_fqn="test-service-table-patch.test-db.test-schema.customers.first_name",
tag_label=TagLabel(
tagFQN=TagFQN("General.Person"),
source="Classification",
labelType="Automated",
state="Suggested",
),
),
ColumnTag(
column_fqn="test-service-table-patch.test-db.test-schema.customers.first_name",
tag_label=TagLabel(
@ -122,6 +131,15 @@ EXPECTED_COLUMN_TAGS = [
state="Suggested",
),
),
ColumnTag(
column_fqn="test-service-table-patch.test-db.test-schema.customers.first_order",
tag_label=TagLabel(
tagFQN=TagFQN("General.DateTime"),
source="Classification",
labelType="Automated",
state="Suggested",
),
),
ColumnTag(
column_fqn="test-service-table-patch.test-db.test-schema.customers.first_order",
tag_label=TagLabel(
@ -131,6 +149,15 @@ EXPECTED_COLUMN_TAGS = [
state="Suggested",
),
),
ColumnTag(
column_fqn="test-service-table-patch.test-db.test-schema.customers.random",
tag_label=TagLabel(
tagFQN=TagFQN("General.Email"),
source="Classification",
labelType="Automated",
state="Suggested",
),
),
ColumnTag(
column_fqn="test-service-table-patch.test-db.test-schema.customers.random",
tag_label=TagLabel(

View File

@ -11,7 +11,7 @@
import inspect
from typing import Iterable, Tuple
from metadata.pii.algorithms.classifiers import ColumnClassifier, HeuristicPIIClassifier
from metadata.pii.algorithms.column_labelers import ColumnLabeler, HeuristicPIILabeler
from metadata.pii.algorithms.tags import PIITag
from .data import pii_samples
@ -27,7 +27,7 @@ def get_sample_data() -> Iterable[Tuple[str, LabeledData]]:
yield name, obj
def run_test_on_pii_classifier(pii_classifier: ColumnClassifier[PIITag]) -> str:
def run_test_on_pii_classifier(pii_classifier: ColumnLabeler[PIITag]) -> str:
"""Apply the classifier to the data and check the results"""
tested_datasets = 0
@ -49,6 +49,6 @@ def run_test_on_pii_classifier(pii_classifier: ColumnClassifier[PIITag]) -> str:
def test_pii_heuristic_classifier(pii_test_logger):
"""Test the PII heuristic classifier"""
heuristic_classifier = HeuristicPIIClassifier()
heuristic_classifier = HeuristicPIILabeler()
results = run_test_on_pii_classifier(heuristic_classifier)
pii_test_logger.info(results)

View File

@ -0,0 +1,45 @@
from metadata.pii.algorithms.label_extractors import ProbabilisticLabelExtractor
def test_extract_labels_single_dominant_label():
extractor = ProbabilisticLabelExtractor(
score_threshold=0.2, prob_threshold=0.8, k=1
)
scores = {
"LabelA": 0.9,
"LabelB": 0.1,
}
# After filtering: {"LabelA": 0.9}
# Normalized: LabelA = 1.0
# Passes probability threshold
assert extractor.extract_labels(scores) == {"LabelA"}
def test_extract_labels_top_label_below_probability_threshold():
extractor = ProbabilisticLabelExtractor(
score_threshold=0.1, prob_threshold=0.9, k=1
)
scores = {
"LabelA": 0.5,
"LabelB": 0.4,
}
# Normalized: A ≈ 0.56, B ≈ 0.44 → neither meets prob_threshold=0.9
assert extractor.extract_labels(scores) == set()
def test_extract_labels_equal_scores_all_pass():
extractor = ProbabilisticLabelExtractor(
score_threshold=0.1, prob_threshold=0.3, k=2
)
scores = {
"LabelA": 0.4,
"LabelB": 0.4,
}
# Normalized: each = 0.5 → both ≥ prob_threshold
assert extractor.extract_labels(scores) == {"LabelA", "LabelB"}

View File

@ -13,7 +13,7 @@ from metadata.pii.algorithms.presidio_utils import (
set_presidio_logger_level,
)
from metadata.pii.algorithms.tags import PIITag
from metadata.pii.scanners.ner_scanner import SUPPORTED_LANG
from metadata.pii.constants import SUPPORTED_LANG
def test_analyzer_supports_all_expected_pii_entities():

View File

@ -0,0 +1,22 @@
import json
from metadata.pii.algorithms.tags import PIICategoryTag
from .... import REPO_ROOT_DIR
def test_pii_categories_agree_with_openmetadata_ner_entities() -> None:
"""
Test that the PII categories agree with the OpenMetadata service
"""
path = (
REPO_ROOT_DIR
/ "openmetadata-service/src/main/resources/json/data/tags/NEREntityGeneralTags.json"
)
with open(path, "r") as file:
data = json.load(file)
tag_labels = {create_tag["name"] for create_tag in data["createTags"]}
pii_category_tag = {pii_cat_tag.value for pii_cat_tag in PIICategoryTag}
assert (
pii_category_tag == tag_labels
), f"PII Category Tags {pii_category_tag} do not match OpenMetadata NEREntityGeneralTags {tag_labels}"

View File

@ -0,0 +1,39 @@
import pytest
from metadata.pii.algorithms.tags import PIISensitivityTag, PIITag
from metadata.pii.algorithms.tags_ops import categorize_pii_tag, resolve_sensitivity
def test_each_pii_tag_is_mapped_to_a_pii_tag_category():
"""
Test that each PII tag is mapped to a PII tag category.
"""
for tag in PIITag:
try:
_ = categorize_pii_tag(tag)
except ValueError:
raise AssertionError(f"PII tag {tag} is not mapped to a category.")
@pytest.mark.parametrize(
"input_tags,expected",
[
([], None),
([PIISensitivityTag.NONSENSITIVE], PIISensitivityTag.NONSENSITIVE),
([PIISensitivityTag.SENSITIVE], PIISensitivityTag.SENSITIVE),
(
[PIISensitivityTag.NONSENSITIVE, PIISensitivityTag.NONSENSITIVE],
PIISensitivityTag.NONSENSITIVE,
),
(
[PIISensitivityTag.NONSENSITIVE, PIISensitivityTag.SENSITIVE],
PIISensitivityTag.SENSITIVE,
),
(
[PIISensitivityTag.SENSITIVE, PIISensitivityTag.SENSITIVE],
PIISensitivityTag.SENSITIVE,
),
],
)
def test_resolve_sensitivity(input_tags, expected):
assert resolve_sensitivity(input_tags) == expected

View File

@ -1,66 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Column Name Scanner
"""
import pytest
from metadata.pii.models import TagAndConfidence
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
EXPECTED_SENSITIVE = TagAndConfidence(
tag_fqn="PII.Sensitive",
confidence=1,
)
@pytest.fixture
def scanner() -> ColumnNameScanner:
"""Return the scanner"""
return ColumnNameScanner()
def test_column_names_none(scanner):
assert scanner.scan("access_channel") is None
assert scanner.scan("status_reason") is None
# Credit Card
assert scanner.scan("credit") is None
assert scanner.scan("user_credits") is None
# Users
assert scanner.scan("id") is None
assert scanner.scan("user_id") is None
# Mails
assert scanner.scan("email_verified") is None
def test_column_names_sensitive(scanner):
# Bank
assert scanner.scan("bank_account") == EXPECTED_SENSITIVE
# Credit Card
assert scanner.scan("credit_card") == EXPECTED_SENSITIVE
assert scanner.scan("credit_card_number") == EXPECTED_SENSITIVE
assert scanner.scan("personal_credit_card") == EXPECTED_SENSITIVE
# Users
assert scanner.scan("user_name") == EXPECTED_SENSITIVE
assert scanner.scan("user_first_name") == EXPECTED_SENSITIVE
assert scanner.scan("user_last_name") == EXPECTED_SENSITIVE
assert scanner.scan("client_name") == EXPECTED_SENSITIVE
assert scanner.scan("person_first_name") == EXPECTED_SENSITIVE
assert scanner.scan("client_last_name") == EXPECTED_SENSITIVE
assert scanner.scan("email") == EXPECTED_SENSITIVE
assert scanner.scan("email_address") == EXPECTED_SENSITIVE
assert scanner.scan("ssn") == EXPECTED_SENSITIVE

View File

@ -1,165 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Column Name Scanner
"""
from typing import Any
import pytest
from metadata.pii.scanners.ner_scanner import NERScanner, StringAnalysis
@pytest.fixture
def scanner() -> NERScanner:
"""Return the scanner"""
return NERScanner()
def test_scanner_none(scanner):
assert scanner.scan(list(range(100))) is None
assert (
scanner.scan(
" ".split(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nam consequat quam sagittis convallis cursus."
)
)
) is None
def test_scanner_sensitive(scanner):
assert (
scanner.scan(
[
"geraldc@gmail.com",
"saratimithi@godesign.com",
"heroldsean@google.com",
]
).tag_fqn
== "PII.Sensitive"
)
assert (
scanner.scan(["im ok", "saratimithi@godesign.com", "not sensitive"]).tag_fqn
== "PII.Sensitive"
)
def test_scanner_nonsensitive(scanner):
assert (
scanner.scan(
[
"Washington",
"Alaska",
"Netherfield Lea Street",
]
).tag_fqn
== "PII.NonSensitive"
)
def test_get_highest_score_label(scanner):
"""Validate that even with score clashes, we only get one result back"""
assert scanner.get_highest_score_label(
{
"PII.Sensitive": StringAnalysis(score=0.9, appearances=1),
"PII.NonSensitive": StringAnalysis(score=0.8, appearances=1),
}
) == ("PII.Sensitive", 0.9)
assert scanner.get_highest_score_label(
{
"PII.Sensitive": StringAnalysis(score=1.0, appearances=1),
"PII.NonSensitive": StringAnalysis(score=1.0, appearances=1),
}
) == ("PII.Sensitive", 1.0)
@pytest.mark.parametrize(
"data,is_json",
[
("potato", (False, None)),
("1", (False, None)),
('{"key": "value"}', (True, {"key": "value"})),
(
'{"key": "value", "key2": "value2"}',
(True, {"key": "value", "key2": "value2"}),
),
('["potato"]', (True, ["potato"])),
],
)
def test_is_json_data(scanner, data: Any, is_json: bool):
"""Assert we are flagging JSON data correctly"""
assert scanner.is_json_data(data) == is_json
def test_scanner_with_json(scanner):
"""Test the scanner with JSON data"""
assert (
scanner.scan(
[
'{"email": "johndoe@example.com", "address": {"street": "123 Main St"}}',
'{"email": "potato", "age": 30, "preferences": {"newsletter": true, "notifications": "email"}}',
]
).tag_fqn
== "PII.Sensitive"
)
assert (
scanner.scan(
[
'{"email": "foo", "address": {"street": "bar"}}',
'{"email": "potato", "age": 30, "preferences": {"newsletter": true, "notifications": "email"}}',
]
)
is None
)
def test_scanner_with_lists(scanner):
"""Test the scanner with list data"""
assert scanner.scan(["foo", "bar", "biz"]) is None
assert (
scanner.scan(["foo", "bar", "johndoe@example.com"]).tag_fqn == "PII.Sensitive"
)
assert (
scanner.scan(
[
'{"emails": ["johndoe@example.com", "lima@example.com"]}',
'{"emails": ["foo", "bar", "biz"]}',
]
).tag_fqn
== "PII.Sensitive"
)
def test_scan_entities(scanner):
"""
We can properly validate certain entities.
> NOTE: These lists are randomly generated and not valid IDs for any actual use
"""
pan_numbers = ["AFZPK7190K", "BLQSM2938L", "CWRTJ5821M", "DZXNV9045A", "EHYKG6752P"]
assert scanner.scan(pan_numbers).tag_fqn == "PII.Sensitive"
ssn_numbers = [
"123-45-6789",
"987-65-4321",
"543-21-0987",
"678-90-1234",
"876-54-3210",
]
assert scanner.scan(ssn_numbers).tag_fqn == "PII.Sensitive"
nif_numbers = ["12345678A", "87654321B", "23456789C", "98765432D", "34567890E"]
assert scanner.scan(nif_numbers).tag_fqn == "PII.Sensitive"

View File

@ -1,35 +0,0 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource
from metadata.pii.algorithms.tags import PIISensitivityTag
from metadata.pii.processor import PIIProcessor
def test_pii_processor_build_tag_label_for_pii_sensitive():
tag = PIISensitivityTag.SENSITIVE
tag_label = PIIProcessor.build_tag_label(tag)
assert tag_label.tagFQN.root == "PII.Sensitive"
assert tag_label.source == TagSource.Classification
assert tag_label.state == State.Suggested
assert tag_label.labelType == LabelType.Generated
def test_pii_processor_build_tag_label_for_pii_nonsensitive():
tag = PIISensitivityTag.NONSENSITIVE
tag_label = PIIProcessor.build_tag_label(tag)
assert tag_label.tagFQN.root == "PII.NonSensitive"
assert tag_label.source == TagSource.Classification
assert tag_label.state == State.Suggested
assert tag_label.labelType == LabelType.Generated

View File

@ -0,0 +1,33 @@
from metadata.generated.schema.type.tagLabel import LabelType, TagSource
from metadata.pii.algorithms.tags import PIICategoryTag, PIISensitivityTag
from metadata.pii.processor import get_tag_label
def test_get_general_tag_label_from_pii_tag_category():
"""
Test that the general tag FQN from a tag category never fails.
"""
for tag in PIICategoryTag:
try:
tag_label = get_tag_label(tag)
assert tag_label.tagFQN.root == f"General.{tag.value}"
assert tag_label.source == TagSource.Classification
assert tag_label.labelType == LabelType.Generated
except ValueError:
raise AssertionError(f"Failed to get general tag FQN for tag {tag}.")
def test_get_general_tag_label_from_pii_sensitivity():
"""
Test that the general tag FQN from a PII sensitivity never fails.
"""
for tag in PIISensitivityTag:
try:
tag_label = get_tag_label(tag)
assert tag_label.tagFQN.root == f"PII.{tag.value}"
assert tag_label.source == TagSource.Classification
assert tag_label.labelType == LabelType.Generated
except ValueError:
raise AssertionError(
f"Failed to get general tag FQN for sensitivity {tag}."
)

View File

@ -0,0 +1,94 @@
{
"createClassification": {
"name": "General",
"description": "Category describing generic data types, such as `DateTime`, `Location`, or `BankNumber`",
"provider": "system",
"mutuallyExclusive": "false"
},
"createTags": [
{
"name": "DateTime",
"description": "Absolute or relative dates or periods or times smaller than a day."
},
{
"name": "Password",
"description": "Field holding password information."
},
{
"name": "BirthDate",
"description": "Person's birth date."
},
{
"name": "Gender",
"description": "Person's gender."
},
{
"name": "Location",
"description": "Name of politically or geographically defined location (cities, provinces, countries, international regions, bodies of water, mountains."
},
{
"name": "Address",
"description": "Address of a Person."
},
{
"name": "MedicalLicense",
"description": "Common medical license numbers."
},
{
"name": "URL",
"description": "A URL (Uniform Resource Locator), unique identifier used to locate a resource on the Internet."
},
{
"name": "IBANCode",
"description": "The International Bank Account Number (IBAN) is an internationally agreed system of identifying bank accounts across national borders to facilitate the communication and processing of cross border transactions with a reduced risk of transcription errors."
},
{
"name": "BankNumber",
"description": "Bank Account Number."
},
{
"name": "CreditCardNumber",
"description": "Credit card number of the user."
},
{
"name": "Crypto",
"description": "Crypto Wallet Number."
},
{
"name": "DriverLicense",
"description": "Person's driver's license image or number."
},
{
"name": "Email",
"description": "Email address."
},
{
"name": "IPAddress",
"description": "An Internet Protocol (IP) address (either IPv4 or IPv6)."
},
{
"name": "NRP",
"description": "A persons Nationality, religious or political group."
},
{
"name": "Person",
"description": "A full person name, which can include first names, middle names or initials, and last names."
},
{
"name": "PhoneNumber",
"description": "A telephone number."
},
{
"name": "VATCode",
"description": "VAT code or identification number. See [VAT Identification Number](https://en.wikipedia.org/wiki/VAT_identification_number)."
},
{
"name": "NationalID",
"description": "Region specific identifiers, such as [ABN](https://en.wikipedia.org/wiki/Australian_Business_Number), [NIF](https://es.wikipedia.org/wiki/N%C3%BAmero_de_identificaci%C3%B3n_fiscal) or [NHS](https://en.wikipedia.org/wiki/National_Health_Service)."
},
{
"name": "Passport",
"description": "National passport numbers, such as US or IT passports."
}
]
}