From 4e7b1381b1008cd1f73fe6b25d9b88401966a032 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 16 Sep 2024 07:05:30 +0200 Subject: [PATCH] GEN-1410 - Improve PII Logging information (#17835) --- .../src/metadata/pii/scanners/ner_scanner.py | 29 +++++++++++++++---- ingestion/src/metadata/workflow/base.py | 1 + ingestion/src/metadata/workflow/profiler.py | 17 +++++++++-- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/pii/scanners/ner_scanner.py b/ingestion/src/metadata/pii/scanners/ner_scanner.py index bce19613787..e6280b3f048 100644 --- a/ingestion/src/metadata/pii/scanners/ner_scanner.py +++ b/ingestion/src/metadata/pii/scanners/ner_scanner.py @@ -14,6 +14,7 @@ NER Scanner based on Presidio. Supported Entities https://microsoft.github.io/presidio/supported_entities/ """ import json +import logging import traceback from collections import defaultdict from typing import Any, Dict, List, Optional, Tuple, Union @@ -26,9 +27,11 @@ from metadata.pii.models import TagAndConfidence from metadata.pii.ner import NEREntity from metadata.pii.scanners.base import BaseScanner from metadata.utils import fqn -from metadata.utils.logger import pii_logger +from metadata.utils.logger import METADATA_LOGGER, pii_logger logger = pii_logger() +SUPPORTED_LANG = "en" +PRESIDIO_LOGGER = "presidio-analyzer" class StringAnalysis(BaseModel): @@ -40,11 +43,16 @@ class StringAnalysis(BaseModel): appearances: int +class NLPEngineModel(BaseModel): + """Required to pass the nlp_engine as {"lang_code": "en", "model_name": "en_core_web_lg"}""" + + lang_code: str + model_name: str + + # pylint: disable=import-outside-toplevel class NERScanner(BaseScanner): - """ - Based on https://microsoft.github.io/presidio/ - """ + """Based on https://microsoft.github.io/presidio/""" def __init__(self): import spacy @@ -60,8 +68,19 @@ class NERScanner(BaseScanner): download(SPACY_EN_MODEL) spacy.load(SPACY_EN_MODEL) + nlp_engine_model = NLPEngineModel( + lang_code=SUPPORTED_LANG, model_name=SPACY_EN_MODEL + ) + + # Set the presidio logger to talk less about internal entities unless we are debugging + logging.getLogger(PRESIDIO_LOGGER).setLevel( + logging.INFO + if logging.getLogger(METADATA_LOGGER).level == logging.DEBUG + else logging.ERROR + ) + self.analyzer = AnalyzerEngine( - nlp_engine=SpacyNlpEngine(models={"en": SPACY_EN_MODEL}) + nlp_engine=SpacyNlpEngine(models=[nlp_engine_model.model_dump()]) ) @staticmethod diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 573f970de2d..87bca9565a2 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -289,6 +289,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): for step in self.workflow_steps(): logger.info( f"{step.name}: Processed {len(step.status.records)} records," + f" updated {len(step.status.updated_records)} records," f" filtered {len(step.status.filtered)} records," f" found {len(step.status.failures)} errors" ) diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index c6987fb0ef1..4354b8c96bc 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -11,6 +11,11 @@ """ Workflow definition for the profiler """ +from typing import cast + +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -58,9 +63,17 @@ class ProfilerWorkflow(IngestionWorkflow): self.source = source_class.create(self.config.model_dump(), self.metadata) profiler_processor = self._get_profiler_processor() - pii_processor = self._get_pii_processor() sink = self._get_sink() - self.steps = (profiler_processor, pii_processor, sink) + + # Only instantiate the PII Processor on demand + source_config: DatabaseServiceProfilerPipeline = cast( + DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config + ) + if source_config.processPiiSensitive: + pii_processor = self._get_pii_processor() + self.steps = (profiler_processor, pii_processor, sink) + else: + self.steps = (profiler_processor, sink) def test_connection(self): service_config = self.config.source.serviceConnection.root.config