GEN-1410 - Improve PII Logging information (#17835)

This commit is contained in:
Pere Miquel Brull 2024-09-16 07:05:30 +02:00
parent 43d8c13709
commit 4e7b1381b1
3 changed files with 40 additions and 7 deletions

View File

@ -14,6 +14,7 @@ NER Scanner based on Presidio.
Supported Entities https://microsoft.github.io/presidio/supported_entities/ Supported Entities https://microsoft.github.io/presidio/supported_entities/
""" """
import json import json
import logging
import traceback import traceback
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
@ -26,9 +27,11 @@ from metadata.pii.models import TagAndConfidence
from metadata.pii.ner import NEREntity from metadata.pii.ner import NEREntity
from metadata.pii.scanners.base import BaseScanner from metadata.pii.scanners.base import BaseScanner
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.logger import pii_logger from metadata.utils.logger import METADATA_LOGGER, pii_logger
logger = pii_logger() logger = pii_logger()
SUPPORTED_LANG = "en"
PRESIDIO_LOGGER = "presidio-analyzer"
class StringAnalysis(BaseModel): class StringAnalysis(BaseModel):
@ -40,11 +43,16 @@ class StringAnalysis(BaseModel):
appearances: int appearances: int
class NLPEngineModel(BaseModel):
"""Required to pass the nlp_engine as {"lang_code": "en", "model_name": "en_core_web_lg"}"""
lang_code: str
model_name: str
# pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
class NERScanner(BaseScanner): class NERScanner(BaseScanner):
""" """Based on https://microsoft.github.io/presidio/"""
Based on https://microsoft.github.io/presidio/
"""
def __init__(self): def __init__(self):
import spacy import spacy
@ -60,8 +68,19 @@ class NERScanner(BaseScanner):
download(SPACY_EN_MODEL) download(SPACY_EN_MODEL)
spacy.load(SPACY_EN_MODEL) spacy.load(SPACY_EN_MODEL)
nlp_engine_model = NLPEngineModel(
lang_code=SUPPORTED_LANG, model_name=SPACY_EN_MODEL
)
# Set the presidio logger to talk less about internal entities unless we are debugging
logging.getLogger(PRESIDIO_LOGGER).setLevel(
logging.INFO
if logging.getLogger(METADATA_LOGGER).level == logging.DEBUG
else logging.ERROR
)
self.analyzer = AnalyzerEngine( self.analyzer = AnalyzerEngine(
nlp_engine=SpacyNlpEngine(models={"en": SPACY_EN_MODEL}) nlp_engine=SpacyNlpEngine(models=[nlp_engine_model.model_dump()])
) )
@staticmethod @staticmethod

View File

@ -289,6 +289,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
for step in self.workflow_steps(): for step in self.workflow_steps():
logger.info( logger.info(
f"{step.name}: Processed {len(step.status.records)} records," f"{step.name}: Processed {len(step.status.records)} records,"
f" updated {len(step.status.updated_records)} records,"
f" filtered {len(step.status.filtered)} records," f" filtered {len(step.status.filtered)} records,"
f" found {len(step.status.failures)} errors" f" found {len(step.status.failures)} errors"
) )

View File

@ -11,6 +11,11 @@
""" """
Workflow definition for the profiler Workflow definition for the profiler
""" """
from typing import cast
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
@ -58,9 +63,17 @@ class ProfilerWorkflow(IngestionWorkflow):
self.source = source_class.create(self.config.model_dump(), self.metadata) self.source = source_class.create(self.config.model_dump(), self.metadata)
profiler_processor = self._get_profiler_processor() profiler_processor = self._get_profiler_processor()
pii_processor = self._get_pii_processor()
sink = self._get_sink() sink = self._get_sink()
self.steps = (profiler_processor, pii_processor, sink)
# Only instantiate the PII Processor on demand
source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
)
if source_config.processPiiSensitive:
pii_processor = self._get_pii_processor()
self.steps = (profiler_processor, pii_processor, sink)
else:
self.steps = (profiler_processor, sink)
def test_connection(self): def test_connection(self):
service_config = self.config.source.serviceConnection.root.config service_config = self.config.source.serviceConnection.root.config