MINOR - Better PII classification for JSON data (#17734)

* MINOR - Better PII classification for JSON data

* linting
This commit is contained in:
Pere Miquel Brull 2024-09-06 08:54:23 +02:00
parent c6a7aae09b
commit fa198f2942
6 changed files with 171 additions and 57 deletions

View File

@ -66,6 +66,7 @@ class PIIProcessor(Processor):
) # Used to satisfy type checked
self._ner_scanner = None
self.name_scanner = ColumnNameScanner()
self.confidence_threshold = self.source_config.confidence
@property
@ -128,7 +129,7 @@ class PIIProcessor(Processor):
return None
# Scan by column name. If no results there, check the sample data, if any
tag_and_confidence = ColumnNameScanner.scan(column.name.root) or (
tag_and_confidence = self.name_scanner.scan(column.name.root) or (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None

View File

@ -0,0 +1,23 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Basic Scanner ABC
"""
from abc import ABC, abstractmethod
from typing import Any
class BaseScanner(ABC):
"""Basic scanner abstract class"""
@abstractmethod
def scan(self, data: Any):
"""Scan the given data from a column"""

View File

@ -17,17 +17,18 @@ from typing import Optional
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.pii.constants import PII
from metadata.pii.models import TagAndConfidence, TagType
from metadata.pii.scanners.base import BaseScanner
from metadata.utils import fqn
class ColumnNameScanner:
class ColumnNameScanner(BaseScanner):
"""
Column Name Scanner to scan column name
"""
sensitive_regex = {
"PASSWORD": re.compile("^.*password.*$", re.IGNORECASE),
"SSN": re.compile("^.*(ssn|social).*$", re.IGNORECASE),
"US_SSN": re.compile("^.*(ssn|social).*$", re.IGNORECASE),
"CREDIT_CARD": re.compile("^.*(credit).*(card).*$", re.IGNORECASE),
"BANK_ACCOUNT": re.compile("^.*bank.*(acc|num).*$", re.IGNORECASE),
"EMAIL_ADDRESS": re.compile("^.*(email|e-mail|mail).*$", re.IGNORECASE),
@ -53,14 +54,13 @@ class ColumnNameScanner:
"PHONE_NUMBER": re.compile("^.*(phone).*$", re.IGNORECASE),
}
@classmethod
def scan(cls, column_name: str) -> Optional[TagAndConfidence]:
def scan(self, data: str) -> Optional[TagAndConfidence]:
"""
Check the column name against the regex patterns and prepare the
sensitive or non-sensitive tag
"""
for pii_type_pattern in cls.sensitive_regex.values():
if pii_type_pattern.match(column_name) is not None:
for pii_type_pattern in self.sensitive_regex.values():
if pii_type_pattern.match(data) is not None:
return TagAndConfidence(
tag_fqn=fqn.build(
metadata=None,
@ -71,8 +71,8 @@ class ColumnNameScanner:
confidence=1,
)
for pii_type_pattern in cls.non_sensitive_regex.values():
if pii_type_pattern.match(column_name) is not None:
for pii_type_pattern in self.non_sensitive_regex.values():
if pii_type_pattern.match(data) is not None:
return TagAndConfidence(
tag_fqn=fqn.build(
metadata=None,

View File

@ -13,9 +13,10 @@ NER Scanner based on Presidio.
Supported Entities https://microsoft.github.io/presidio/supported_entities/
"""
import json
import traceback
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import BaseModel
@ -23,6 +24,7 @@ from metadata.generated.schema.entity.classification.tag import Tag
from metadata.pii.constants import PII, SPACY_EN_MODEL
from metadata.pii.models import TagAndConfidence
from metadata.pii.ner import NEREntity
from metadata.pii.scanners.base import BaseScanner
from metadata.utils import fqn
from metadata.utils.logger import pii_logger
@ -39,7 +41,7 @@ class StringAnalysis(BaseModel):
# pylint: disable=import-outside-toplevel
class NERScanner:
class NERScanner(BaseScanner):
"""
Based on https://microsoft.github.io/presidio/
"""
@ -74,7 +76,7 @@ class NERScanner:
)
return top_entity, entities_score[top_entity].score
def scan(self, sample_data_rows: List[Any]) -> Optional[TagAndConfidence]:
def scan(self, data: List[Any]) -> Optional[TagAndConfidence]:
"""
Scan the column's sample data rows and look for PII.
@ -95,24 +97,17 @@ class NERScanner:
be thought as the "score" times "weighted down appearances".
4. Once we have the "top" `Entity` from that column, we assign the PII label accordingly from `NEREntity`.
"""
logger.debug("Processing '%s'", sample_data_rows)
logger.debug("Processing '%s'", data)
# Initialize an empty dict for the given row list
entities_score: Dict[str, StringAnalysis] = defaultdict(
lambda: StringAnalysis(score=0, appearances=0)
)
str_sample_data_rows = [str(row) for row in sample_data_rows if row is not None]
str_sample_data_rows = [str(row) for row in data if row is not None]
for row in str_sample_data_rows:
try:
results = self.analyzer.analyze(row, language="en")
for result in results:
entities_score[result.entity_type] = StringAnalysis(
score=result.score
if result.score > entities_score[result.entity_type].score
else entities_score[result.entity_type].score,
appearances=entities_score[result.entity_type].appearances + 1,
)
self.process_data(row=row, entities_score=entities_score)
except Exception as exc:
logger.warning(f"Unknown error while processing {row} - {exc}")
logger.debug(traceback.format_exc())
@ -133,3 +128,38 @@ class NERScanner:
)
return None
def process_data(self, row: str, entities_score: Dict[str, StringAnalysis]) -> None:
"""Process the Sample Data rows, checking if they are of JSON format as well"""
# first, check if the data is JSON or we can work with strings
is_json, value = self.is_json_data(row)
if is_json and isinstance(value, dict):
for val in value.values():
self.process_data(row=str(val), entities_score=entities_score)
elif is_json and isinstance(value, list):
for val in value:
self.process_data(row=str(val), entities_score=entities_score)
else:
self.scan_value(value=row, entities_score=entities_score)
@staticmethod
def is_json_data(value: str) -> Tuple[bool, Union[dict, list, None]]:
"""Check if the value is a JSON object that we need to process differently than strings"""
try:
res = json.loads(value)
if isinstance(res, (dict, list)):
return True, res
return False, None
except json.JSONDecodeError:
return False, None
def scan_value(self, value: str, entities_score: Dict[str, StringAnalysis]):
"""Scan the value for PII"""
results = self.analyzer.analyze(value, language="en")
for result in results:
entities_score[result.entity_type] = StringAnalysis(
score=result.score
if result.score > entities_score[result.entity_type].score
else entities_score[result.entity_type].score,
appearances=entities_score[result.entity_type].appearances + 1,
)

View File

@ -11,7 +11,7 @@
"""
Test Column Name Scanner
"""
from unittest import TestCase
import pytest
from metadata.pii.models import TagAndConfidence
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
@ -22,44 +22,41 @@ EXPECTED_SENSITIVE = TagAndConfidence(
)
class ColumnNameScannerTest(TestCase):
"""
Validate various typical column names
"""
@pytest.fixture
def scanner() -> ColumnNameScanner:
"""Return the scanner"""
return ColumnNameScanner()
def test_column_names_none(self):
self.assertIsNone(ColumnNameScanner.scan("access_channel"))
self.assertIsNone(ColumnNameScanner.scan("status_reason"))
def test_column_names_none(scanner):
assert scanner.scan("access_channel") is None
assert scanner.scan("status_reason") is None
# Credit Card
self.assertIsNone(ColumnNameScanner.scan("credit"))
self.assertIsNone(ColumnNameScanner.scan("user_credits"))
assert scanner.scan("credit") is None
assert scanner.scan("user_credits") is None
# Users
self.assertIsNone(ColumnNameScanner.scan("id"))
self.assertIsNone(ColumnNameScanner.scan("user_id"))
assert scanner.scan("id") is None
assert scanner.scan("user_id") is None
def test_column_names_sensitive(self):
def test_column_names_sensitive(scanner):
# Bank
self.assertEqual(ColumnNameScanner.scan("bank_account"), EXPECTED_SENSITIVE)
assert scanner.scan("bank_account") == EXPECTED_SENSITIVE
# Credit Card
self.assertEqual(ColumnNameScanner.scan("credit_card"), EXPECTED_SENSITIVE)
self.assertEqual(
ColumnNameScanner.scan("credit_card_number"), EXPECTED_SENSITIVE
)
self.assertEqual(
ColumnNameScanner.scan("personal_credit_card"), EXPECTED_SENSITIVE
)
assert scanner.scan("credit_card") == EXPECTED_SENSITIVE
assert scanner.scan("credit_card_number") == EXPECTED_SENSITIVE
assert scanner.scan("personal_credit_card") == EXPECTED_SENSITIVE
# Users
self.assertEqual(ColumnNameScanner.scan("user_name"), EXPECTED_SENSITIVE)
self.assertEqual(ColumnNameScanner.scan("user_first_name"), EXPECTED_SENSITIVE)
self.assertEqual(ColumnNameScanner.scan("user_last_name"), EXPECTED_SENSITIVE)
self.assertEqual(ColumnNameScanner.scan("client_name"), EXPECTED_SENSITIVE)
self.assertEqual(
ColumnNameScanner.scan("person_first_name"), EXPECTED_SENSITIVE
)
self.assertEqual(ColumnNameScanner.scan("client_last_name"), EXPECTED_SENSITIVE)
assert scanner.scan("user_name") == EXPECTED_SENSITIVE
assert scanner.scan("user_first_name") == EXPECTED_SENSITIVE
assert scanner.scan("user_last_name") == EXPECTED_SENSITIVE
assert scanner.scan("client_name") == EXPECTED_SENSITIVE
assert scanner.scan("person_first_name") == EXPECTED_SENSITIVE
assert scanner.scan("client_last_name") == EXPECTED_SENSITIVE
self.assertEqual(ColumnNameScanner.scan("email"), EXPECTED_SENSITIVE)
assert scanner.scan("email") == EXPECTED_SENSITIVE
assert scanner.scan("ssn") == EXPECTED_SENSITIVE

View File

@ -11,6 +11,7 @@
"""
Test Column Name Scanner
"""
from typing import Any
import pytest
@ -78,3 +79,65 @@ def test_get_highest_score_label(scanner):
"PII.NonSensitive": StringAnalysis(score=1.0, appearances=1),
}
) == ("PII.Sensitive", 1.0)
@pytest.mark.parametrize(
"data,is_json",
[
("potato", (False, None)),
("1", (False, None)),
('{"key": "value"}', (True, {"key": "value"})),
(
'{"key": "value", "key2": "value2"}',
(True, {"key": "value", "key2": "value2"}),
),
('["potato"]', (True, ["potato"])),
],
)
def test_is_json_data(scanner, data: Any, is_json: bool):
"""Assert we are flagging JSON data correctly"""
assert scanner.is_json_data(data) == is_json
def test_scanner_with_json(scanner):
"""Test the scanner with JSON data"""
assert (
scanner.scan(
[
'{"email": "johndoe@example.com", "address": {"street": "123 Main St"}}',
'{"email": "potato", "age": 30, "preferences": {"newsletter": true, "notifications": "email"}}',
]
).tag_fqn
== "PII.Sensitive"
)
assert (
scanner.scan(
[
'{"email": "foo", "address": {"street": "bar"}}',
'{"email": "potato", "age": 30, "preferences": {"newsletter": true, "notifications": "email"}}',
]
)
is None
)
def test_scanner_with_lists(scanner):
"""Test the scanner with list data"""
assert scanner.scan(["foo", "bar", "biz"]) is None
assert (
scanner.scan(["foo", "bar", "johndoe@example.com"]).tag_fqn == "PII.Sensitive"
)
assert (
scanner.scan(
[
'{"emails": ["johndoe@example.com", "lima@example.com"]}',
'{"emails": ["foo", "bar", "biz"]}',
]
).tag_fqn
== "PII.Sensitive"
)