diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index 8a685e59ee2..7a16ce72534 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -21,9 +21,9 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel PipelineType, ) from metadata.generated.schema.metadataIngestion.workflow import LogLevels -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.utils.logger import set_loggers_level from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.usage import UsageWorkflow from metadata.workflow.workflow_output_handler import print_status diff --git a/ingestion/src/metadata/cli/profile.py b/ingestion/src/metadata/cli/profile.py index f2d76f2968a..0ca17da5bbe 100644 --- a/ingestion/src/metadata/cli/profile.py +++ b/ingestion/src/metadata/cli/profile.py @@ -17,12 +17,12 @@ import sys import traceback from metadata.config.common import load_config_file -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.utils.logger import cli_logger +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import ( WorkflowType, print_init_error, - print_profiler_status, + print_status, ) logger = cli_logger() @@ -48,5 +48,5 @@ def run_profiler(config_path: str) -> None: workflow.execute() workflow.stop() - print_profiler_status(workflow) + print_status(workflow) workflow.raise_from_status() diff --git a/ingestion/src/metadata/ingestion/api/step.py b/ingestion/src/metadata/ingestion/api/step.py index 11061760a21..7502831bd99 100644 --- a/ingestion/src/metadata/ingestion/api/step.py +++ b/ingestion/src/metadata/ingestion/api/step.py @@ -61,17 +61,17 @@ class ReturnStep(Step, ABC): """Steps that run by returning a single unit""" @abstractmethod - def _run(self, *args, **kwargs) -> Either: + def _run(self, record: Entity) -> Either: """ Main entrypoint to execute the step """ - def run(self, *args, **kwargs) -> Optional[Entity]: + def run(self, record: Entity) -> Optional[Entity]: """ Run the step and handle the status and exceptions """ try: - result: Either = self._run(*args, **kwargs) + result: Either = self._run(record) if result.left is not None: self.status.failed(result.left) return None @@ -98,7 +98,7 @@ class StageStep(Step, ABC): """Steps that run by returning a single unit""" @abstractmethod - def _run(self, *args, **kwargs) -> Iterable[Either[str]]: + def _run(self, record: Entity) -> Iterable[Either[str]]: """ Main entrypoint to execute the step. @@ -110,12 +110,12 @@ class StageStep(Step, ABC): pick up the file components. """ - def run(self, *args, **kwargs) -> None: + def run(self, record: Entity) -> None: """ Run the step and handle the status and exceptions. """ try: - for result in self._run(*args, **kwargs): + for result in self._run(record): if result.left is not None: self.status.failed(result.left) @@ -138,7 +138,7 @@ class IterStep(Step, ABC): """Steps that are run as Iterables""" @abstractmethod - def _iter(self, *args, **kwargs) -> Iterable[Either]: + def _iter(self) -> Iterable[Either]: """Main entrypoint to run through the Iterator""" def run(self) -> Iterable[Optional[Entity]]: diff --git a/ingestion/src/metadata/ingestion/api/steps.py b/ingestion/src/metadata/ingestion/api/steps.py index 5dba735dbf1..9c0023f28b6 100644 --- a/ingestion/src/metadata/ingestion/api/steps.py +++ b/ingestion/src/metadata/ingestion/api/steps.py @@ -52,9 +52,7 @@ class Sink(ReturnStep, ABC): """All Sinks must inherit this base class.""" @abstractmethod - def _run( # pylint: disable=arguments-differ - self, record: Entity, *_, **__ - ) -> Either: + def _run(self, record: Entity) -> Either: """ Send the data somewhere, e.g., the OM API """ @@ -64,7 +62,7 @@ class Processor(ReturnStep, ABC): """All Processor must inherit this base class""" @abstractmethod - def _run(self, *args, **kwargs) -> Either: + def _run(self, record: Entity) -> Either: """ Post process a given entity and return it or a new one diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index ebe52059e27..81298132bbe 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -138,7 +138,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): except Exception as exc: logger.debug(traceback.format_exc()) logger.error( - f"Error trying to PATCH description for {entity.__class__.__name__} [{source.id}]: {exc}" + f"Error trying to PATCH {entity.__name__} [{source.id.__root__}]: {exc}" ) return None diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index 819a48c8e19..acad802a47f 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -89,15 +89,13 @@ class QueryParserProcessor(Processor): connection_type = kwargs.pop("connection_type", "") return cls(config, metadata_config, connection_type) - def _run( # pylint: disable=arguments-differ - self, queries: TableQueries - ) -> Optional[Either[QueryParserData]]: - if queries and queries.queries: + def _run(self, record: TableQueries) -> Optional[Either[QueryParserData]]: + if record and record.queries: data = [] - for record in queries.queries: + for table_query in record.queries: try: parsed_sql = parse_sql_statement( - record, + table_query, ConnectionTypeDialectMapper.dialect_of(self.connection_type), ) if parsed_sql: @@ -106,7 +104,7 @@ class QueryParserProcessor(Processor): return Either( left=StackTraceError( name="Query", - error=f"Error processing query [{record.query}]: {exc}", + error=f"Error processing query [{table_query.query}]: {exc}", stack_trace=traceback.format_exc(), ) ) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 09d855fc913..ccd57cfbc9b 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -66,6 +66,7 @@ from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.profiler.api.models import ProfilerResponse from metadata.utils.helpers import calculate_execution_time from metadata.utils.logger import get_log_name, ingestion_logger @@ -403,6 +404,60 @@ class MetadataRestSink(Sink): logger.debug(f"No sample data to PUT for {get_log_name(record.entity)}") return Either(right=record.entity) + @_run_dispatch.register + def write_profiler_response(self, record: ProfilerResponse) -> Either[Table]: + """Cleanup "`" character in columns and ingest""" + column_profile = record.profile.columnProfile + for column in column_profile: + column.name = column.name.replace("`", "") + + record.profile.columnProfile = column_profile + + table = self.metadata.ingest_profile_data( + table=record.table, + profile_request=record.profile, + ) + logger.debug( + f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.__root__}" + ) + + if record.sample_data: + table_data = self.metadata.ingest_table_sample_data( + table=record.table, sample_data=record.sample_data + ) + if not table_data: + self.status.failed( + StackTraceError( + name=table.fullyQualifiedName.__root__, + error="Error trying to ingest sample data for table", + ) + ) + else: + logger.debug( + f"Successfully ingested sample data for {record.table.fullyQualifiedName.__root__}" + ) + + for column_tag_response in record.column_tags or []: + patched = self.metadata.patch_column_tag( + table=record.table, + column_fqn=column_tag_response.column_fqn, + tag_label=column_tag_response.tag_label, + ) + if not patched: + self.status.failed( + StackTraceError( + name=table.fullyQualifiedName.__root__, + error="Error patching tags for table", + ) + ) + else: + logger.debug( + f"Successfully patched tag {column_tag_response.tag_label} for" + f" {record.table.fullyQualifiedName.__root__}.{column_tag_response.column_fqn}" + ) + + return Either(right=table) + def close(self): """ We don't have anything to close since we are using the given metadata client diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 645d6a74f9a..94b6108525d 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -109,9 +109,7 @@ class TableUsageStage(Stage): ) ] - def _run( # pylint: disable=arguments-differ - self, record: QueryParserData, *_, **__ - ) -> Iterable[Either[str]]: + def _run(self, record: QueryParserData) -> Iterable[Either[str]]: """ Process the parsed data and store it in a file """ diff --git a/ingestion/src/metadata/pii/ner.py b/ingestion/src/metadata/pii/ner.py new file mode 100644 index 00000000000..6e8cd3ee8a8 --- /dev/null +++ b/ingestion/src/metadata/pii/ner.py @@ -0,0 +1,67 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NER Scanner based on Presidio. + +Supported Entities https://microsoft.github.io/presidio/supported_entities/ +""" +from enum import Enum + +from metadata.pii.models import TagType +from metadata.utils.logger import pii_logger + +logger = pii_logger() + + +class NEREntity(Enum): + """ + PII Entities supported by Presidio https://microsoft.github.io/presidio/supported_entities/ + """ + + # Global + CREDIT_CARD = TagType.SENSITIVE.value + CRYPTO = TagType.SENSITIVE.value + DATE_TIME = TagType.NONSENSITIVE.value + EMAIL_ADDRESS = TagType.SENSITIVE.value + IBAN_CODE = TagType.SENSITIVE.value + IP_ADDRESS = TagType.SENSITIVE.value + NRP = TagType.NONSENSITIVE.value + LOCATION = TagType.NONSENSITIVE.value + PERSON = TagType.SENSITIVE.value + PHONE_NUMBER = TagType.NONSENSITIVE.value + MEDICAL_LICENSE = TagType.SENSITIVE.value + URL = TagType.NONSENSITIVE.value + + # USA + US_BANK_NUMBER = TagType.SENSITIVE.value + US_DRIVER_LICENSE = TagType.SENSITIVE.value + US_ITIN = TagType.SENSITIVE.value + US_PASSPORT = TagType.SENSITIVE.value + US_SSN = TagType.SENSITIVE.value + + # UK + UK_NHS = TagType.SENSITIVE.value + + # Spain + NIF = TagType.SENSITIVE.value + + # Italy + IT_FISCAL_CODE = TagType.SENSITIVE.value + IT_DRIVER_LICENSE = TagType.SENSITIVE.value + IT_VAT_CODE = TagType.SENSITIVE.value + IT_PASSPORT = TagType.SENSITIVE.value + IT_IDENTITY_CARD = TagType.SENSITIVE.value + + # Australia + AU_ABN = TagType.SENSITIVE.value + AU_ACN = TagType.SENSITIVE.value + AU_TFN = TagType.SENSITIVE.value + AU_MEDICARE = TagType.SENSITIVE.value diff --git a/ingestion/src/metadata/pii/processor.py b/ingestion/src/metadata/pii/processor.py index 152e3d19a23..c9459d22240 100644 --- a/ingestion/src/metadata/pii/processor.py +++ b/ingestion/src/metadata/pii/processor.py @@ -12,39 +12,77 @@ """ Processor util to fetch pii sensitive columns """ -from typing import Optional +import traceback +from typing import Optional, cast from metadata.generated.schema.entity.classification.tag import Tag -from metadata.generated.schema.entity.data.table import Table, TableData +from metadata.generated.schema.entity.data.table import Column, TableData +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) from metadata.generated.schema.type.tagLabel import ( LabelType, State, TagLabel, TagSource, ) -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.pii.column_name_scanner import ColumnNameScanner +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Processor +from metadata.ingestion.ometa.client_utils import create_ometa_client from metadata.pii.constants import PII -from metadata.pii.ner_scanner import NERScanner +from metadata.pii.scanners.column_name_scanner import ColumnNameScanner +from metadata.pii.scanners.ner_scanner import NERScanner +from metadata.profiler.api.models import PatchColumnTagResponse, ProfilerResponse from metadata.utils import fqn from metadata.utils.logger import profiler_logger logger = profiler_logger() -class PIIProcessor: +class PIIProcessor(Processor): """ A scanner that uses Spacy NER for entity recognition """ - def __init__(self, metadata: OpenMetadata): + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.metadata = create_ometa_client(self.metadata_config) + + # Init and type the source config + self.source_config: DatabaseServiceProfilerPipeline = cast( + DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config + ) # Used to satisfy type checked - self.metadata = metadata self.ner_scanner = NERScanner() + self.confidence_threshold = self.source_config.confidence - def patch_column_tag( - self, tag_type: str, table_entity: Table, column_fqn: str - ) -> None: + @classmethod + def create( + cls, config_dict: dict, metadata_config: OpenMetadataConnection + ) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config, metadata_config=metadata_config) + + def close(self) -> None: + """Nothing to close""" + + def build_column_tag( + self, tag_type: str, column_fqn: str + ) -> PatchColumnTagResponse: """ Build the tag and run the PATCH """ @@ -60,54 +98,84 @@ class PIIProcessor: state=State.Suggested, labelType=LabelType.Automated, ) - self.metadata.patch_column_tag( - table=table_entity, - column_fqn=column_fqn, - tag_label=tag_label, + + return PatchColumnTagResponse(column_fqn=column_fqn, tag_label=tag_label) + + def process_column( + self, + idx: int, + column: Column, + table_data: Optional[TableData], + confidence_threshold: float, + ) -> Optional[PatchColumnTagResponse]: + """ + Tag a column with PII if we find it using our scanners + """ + + # First, check if the column we are about to process + # already has PII tags or not + column_has_pii_tag = any( + (PII in tag.tagFQN.__root__ for tag in column.tags or []) ) - def process( + # If it has PII tags, we skip the processing + # for the column + if column_has_pii_tag is True: + return None + + # Scan by column name. If no results there, check the sample data, if any + tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or ( + self.ner_scanner.scan([row[idx] for row in table_data.rows]) + if table_data + else None + ) + + if ( + tag_and_confidence + and tag_and_confidence.tag + and tag_and_confidence.confidence >= confidence_threshold / 100 + ): + return self.build_column_tag( + tag_type=tag_and_confidence.tag.value, + column_fqn=column.fullyQualifiedName.__root__, + ) + + return None + + def _run( self, - table_data: Optional[TableData], - table_entity: Table, - confidence_threshold: float, - ): + record: ProfilerResponse, + ) -> Either[ProfilerResponse]: """ Main entrypoint for the scanner. Adds PII tagging based on the column names and TableData """ - for idx, column in enumerate(table_entity.columns): + # We don't always need to process + if not self.source_config.processPiiSensitive: + return Either(right=record) + + column_tags = [] + for idx, column in enumerate(record.table.columns): try: - # First, check if the column we are about to process - # already has PII tags or not - column_has_pii_tag = any( - (PII in tag.tagFQN.__root__ for tag in column.tags or []) + col_tag = self.process_column( + idx=idx, + column=column, + table_data=record.sample_data, + confidence_threshold=self.confidence_threshold, ) - - # If it has PII tags, we skip the processing - # for the column - if column_has_pii_tag is True: - continue - - # Scan by column name. If no results there, check the sample data, if any - tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or ( - self.ner_scanner.scan([row[idx] for row in table_data.rows]) - if table_data - else None - ) - - if ( - tag_and_confidence - and tag_and_confidence.tag - and tag_and_confidence.confidence >= confidence_threshold / 100 - ): - self.patch_column_tag( - tag_type=tag_and_confidence.tag.value, - table_entity=table_entity, - column_fqn=column.fullyQualifiedName.__root__, - ) + if col_tag: + column_tags.append(col_tag) except Exception as err: - logger.warning(f"Error computing PII tags for [{column}] - [{err}]") + self.status.failed( + StackTraceError( + name=record.table.fullyQualifiedName.__root__, + error=f"Error computing PII tags for [{column}] - [{err}]", + stack_trace=traceback.format_exc(), + ) + ) + + record.column_tags = column_tags + return Either(right=record) diff --git a/ingestion/src/metadata/pii/column_name_scanner.py b/ingestion/src/metadata/pii/scanners/column_name_scanner.py similarity index 100% rename from ingestion/src/metadata/pii/column_name_scanner.py rename to ingestion/src/metadata/pii/scanners/column_name_scanner.py diff --git a/ingestion/src/metadata/pii/ner_scanner.py b/ingestion/src/metadata/pii/scanners/ner_scanner.py similarity index 77% rename from ingestion/src/metadata/pii/ner_scanner.py rename to ingestion/src/metadata/pii/scanners/ner_scanner.py index 4d875d17022..674b6d2b5dd 100644 --- a/ingestion/src/metadata/pii/ner_scanner.py +++ b/ingestion/src/metadata/pii/scanners/ner_scanner.py @@ -15,64 +15,18 @@ Supported Entities https://microsoft.github.io/presidio/supported_entities/ """ import traceback from collections import defaultdict -from enum import Enum from typing import Any, Dict, List, Optional, Tuple from pydantic import BaseModel from metadata.pii.constants import SPACY_EN_MODEL from metadata.pii.models import TagAndConfidence, TagType +from metadata.pii.ner import NEREntity from metadata.utils.logger import pii_logger logger = pii_logger() -class NEREntity(Enum): - """ - PII Entities supported by Presidio https://microsoft.github.io/presidio/supported_entities/ - """ - - # Global - CREDIT_CARD = TagType.SENSITIVE.value - CRYPTO = TagType.SENSITIVE.value - DATE_TIME = TagType.NONSENSITIVE.value - EMAIL_ADDRESS = TagType.SENSITIVE.value - IBAN_CODE = TagType.SENSITIVE.value - IP_ADDRESS = TagType.SENSITIVE.value - NRP = TagType.NONSENSITIVE.value - LOCATION = TagType.NONSENSITIVE.value - PERSON = TagType.SENSITIVE.value - PHONE_NUMBER = TagType.NONSENSITIVE.value - MEDICAL_LICENSE = TagType.SENSITIVE.value - URL = TagType.NONSENSITIVE.value - - # USA - US_BANK_NUMBER = TagType.SENSITIVE.value - US_DRIVER_LICENSE = TagType.SENSITIVE.value - US_ITIN = TagType.SENSITIVE.value - US_PASSPORT = TagType.SENSITIVE.value - US_SSN = TagType.SENSITIVE.value - - # UK - UK_NHS = TagType.SENSITIVE.value - - # Spain - NIF = TagType.SENSITIVE.value - - # Italy - IT_FISCAL_CODE = TagType.SENSITIVE.value - IT_DRIVER_LICENSE = TagType.SENSITIVE.value - IT_VAT_CODE = TagType.SENSITIVE.value - IT_PASSPORT = TagType.SENSITIVE.value - IT_IDENTITY_CARD = TagType.SENSITIVE.value - - # Australia - AU_ABN = TagType.SENSITIVE.value - AU_ACN = TagType.SENSITIVE.value - AU_TFN = TagType.SENSITIVE.value - AU_MEDICARE = TagType.SENSITIVE.value - - class StringAnalysis(BaseModel): """ Used to store results from the sample data scans for each NER Entity diff --git a/ingestion/src/metadata/profiler/api/models.py b/ingestion/src/metadata/profiler/api/models.py index 02bcc706247..6250fdb4d18 100644 --- a/ingestion/src/metadata/profiler/api/models.py +++ b/ingestion/src/metadata/profiler/api/models.py @@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import ( TableData, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.profiler.processor.models import ProfilerDef @@ -67,6 +68,13 @@ class ProfilerProcessorConfig(ConfigModel): tableConfig: Optional[List[TableConfig]] = None +class PatchColumnTagResponse(ConfigModel): + """Used to patch a tag to a column""" + + column_fqn: str + tag_label: TagLabel + + class ProfilerResponse(ConfigModel): """ ORM Profiler processor response. @@ -78,3 +86,4 @@ class ProfilerResponse(ConfigModel): table: Table profile: CreateTableProfileRequest sample_data: Optional[TableData] = None + column_tags: Optional[List[PatchColumnTagResponse]] = None diff --git a/ingestion/src/metadata/profiler/api/workflow.py b/ingestion/src/metadata/profiler/api/workflow.py deleted file mode 100644 index f7d8ff9924b..00000000000 --- a/ingestion/src/metadata/profiler/api/workflow.py +++ /dev/null @@ -1,459 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow definition for the ORM Profiler. - -- How to specify the source -- How to specify the entities to run -- How to define metrics & tests -""" -import traceback -from typing import Iterable, Optional, cast - -from pydantic import ValidationError - -from metadata.config.common import WorkflowExecutionError -from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Table, TableType -from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( - DatalakeConnection, -) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - PipelineState, -) -from metadata.generated.schema.entity.services.serviceType import ServiceType -from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( - DatabaseServiceProfilerPipeline, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataWorkflowConfig, -) -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.parser import parse_workflow_config_gracefully -from metadata.ingestion.api.status import Status -from metadata.ingestion.api.steps import Sink -from metadata.ingestion.models.custom_types import ServiceWithConnectionType -from metadata.ingestion.ometa.client_utils import create_ometa_client -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.connections import get_connection, get_test_connection_fn -from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse -from metadata.profiler.processor.core import Profiler -from metadata.profiler.source.base.profiler_source import ProfilerSource -from metadata.profiler.source.profiler_source_factory import profiler_source_factory -from metadata.timer.repeated_timer import RepeatedTimer -from metadata.utils import fqn -from metadata.utils.class_helper import ( - get_service_class_from_service_type, - get_service_type_from_source_type, -) -from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table -from metadata.utils.importer import get_sink -from metadata.utils.logger import profiler_logger -from metadata.workflow.workflow_output_handler import ( - get_ingestion_status_timer, - print_profiler_status, -) -from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin - -logger = profiler_logger() - -NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,) -SUCCESS_THRESHOLD_VALUE = 90 -REPORTS_INTERVAL_SECONDS = 60 - - -class ProfilerInterfaceInstantiationError(Exception): - """Raise when interface cannot be instantiated""" - - -class ProfilerWorkflow(WorkflowStatusMixin): - """ - Configure and run the ORM profiler - """ - - config: OpenMetadataWorkflowConfig - sink: Sink - metadata: OpenMetadata - - def __init__(self, config: OpenMetadataWorkflowConfig): - self.profiler = None # defined in `create_profiler()`` - self.config = config - self._timer: Optional[RepeatedTimer] = None - - self.metadata_config: OpenMetadataConnection = ( - self.config.workflowConfig.openMetadataServerConfig - ) - self.profiler_config = ProfilerProcessorConfig.parse_obj( - self.config.processor.dict().get("config") - ) - self.metadata = create_ometa_client(self.metadata_config) - self._retrieve_service_connection_if_needed() - self.test_connection() - self.set_ingestion_pipeline_status(state=PipelineState.running) - # Init and type the source config - self.source_config: DatabaseServiceProfilerPipeline = cast( - DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config - ) # Used to satisfy type checked - self.source_status = Status() - self._profiler_interface_args = None - if self.config.sink: - self.sink = get_sink( - sink_type=self.config.sink.type, - sink_config=self.config.sink, - metadata_config=self.metadata_config, - from_="profiler", - ) - - if not self._validate_service_name(): - raise ValueError( - f"Service name `{self.config.source.serviceName}` does not exist. " - "Make sure you have run the ingestion for the service specified in the profiler workflow. " - "If so, make sure the profiler service name matches the service name specified during ingestion " - "and that your ingestion token (settings > bots) is still valid." - ) - - logger.info( - f"Starting profiler for service {self.config.source.serviceName}" - f":{self.config.source.type.lower()}" - ) - - @classmethod - def create(cls, config_dict: dict) -> "ProfilerWorkflow": - """ - Parse a JSON (dict) and create the workflow - """ - try: - config = parse_workflow_config_gracefully(config_dict) - return cls(config) - except ValidationError as err: - logger.debug(traceback.format_exc()) - logger.error( - f"Error trying to parse the Profiler Workflow configuration: {err}" - ) - raise err - - @property - def timer(self) -> RepeatedTimer: - """Status timer""" - if not self._timer: - self._timer = get_ingestion_status_timer( - interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self - ) - - return self._timer - - def filter_databases(self, database: Database) -> Optional[Database]: - """Returns filtered database entities""" - if filter_by_database( - self.source_config.databaseFilterPattern, - database.name.__root__, - ): - self.source_status.filter( - database.name.__root__, "Database pattern not allowed" - ) - return None - return database - - def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]: - """ - From a list of tables, apply the SQLSourceConfig - filter patterns. - - We will update the status on the SQLSource Status. - """ - for table in tables: - try: - if filter_by_schema( - self.source_config.schemaFilterPattern, - table.databaseSchema.name, # type: ignore - ): - self.source_status.filter( - f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", # type: ignore - "Schema pattern not allowed", - ) - continue - if filter_by_table( - self.source_config.tableFilterPattern, - table.name.__root__, - ): - self.source_status.filter( - f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", # type: ignore - "Table pattern not allowed", - ) - continue - if ( - table.tableType == TableType.View - and not self.source_config.includeViews - ): - self.source_status.filter( - table.fullyQualifiedName.__root__, - "View filtered out", - ) - continue - yield table - except Exception as exc: - self.source_status.failed( - StackTraceError( - name=table.fullyQualifiedName.__root__, - error=f"Unexpected error filtering entities for table [{table}]: {exc}", - stack_trace=traceback.format_exc(), - ) - ) - - def get_database_entities(self): - """List all databases in service""" - - databases = [ - self.filter_databases(database) - for database in self.metadata.list_all_entities( - entity=Database, - params={"service": self.config.source.serviceName}, - ) - if self.filter_databases(database) - ] - - if not databases: - raise ValueError( - "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." - f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long - f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long - ) - - return databases - - def get_table_entities(self, database): - """ - List and filter OpenMetadata tables based on the - source configuration. - - The listing will be based on the entities from the - informed service name in the source configuration. - - Note that users can specify `table_filter_pattern` to - either be `includes` or `excludes`. This means - that we will either what is specified in `includes` - or we will use everything but the tables excluded. - - Same with `schema_filter_pattern`. - """ - tables = self.metadata.list_all_entities( - entity=Table, - fields=[ - "tableProfilerConfig", - ], - params={ - "service": self.config.source.serviceName, - "database": fqn.build( - self.metadata, - entity_type=Database, - service_name=self.config.source.serviceName, - database_name=database.name.__root__, - ), - }, # type: ignore - ) - - yield from self.filter_entities(tables) - - def run_profiler( - self, entity: Table, profiler_source: ProfilerSource - ) -> Optional[ProfilerResponse]: - """ - Main logic for the profiler workflow - """ - profiler_runner: Profiler = profiler_source.get_profiler_runner( - entity, self.profiler_config - ) - - try: - profile: ProfilerResponse = profiler_runner.process( - self.source_config.generateSampleData, - self.source_config.processPiiSensitive, - ) - except Exception as exc: - self.source_status.failed( - StackTraceError( - name=entity.fullyQualifiedName.__root__, - error=f"Unexpected exception processing entity {entity.fullyQualifiedName.__root__}: {exc}", - stack_trace=traceback.format_exc(), - ) - ) - self.source_status.fail_all( - profiler_source.interface.processor_status.failures - ) - self.source_status.records.extend( - profiler_source.interface.processor_status.records - ) - else: - # at this point we know we have an interface variable since we the `try` block above didn't raise - self.source_status.fail_all(profiler_source.interface.processor_status.failures) # type: ignore - self.source_status.records.extend( - profiler_source.interface.processor_status.records # type: ignore - ) - return profile - finally: - profiler_runner.close() - - return None - - def execute(self): - """ - Run the profiling and tests - """ - self.timer.trigger() - - try: - for database in self.get_database_entities(): - profiler_source = profiler_source_factory.create( - self.config.source.type.lower(), - self.config, - database, - self.metadata, - ) - for entity in self.get_table_entities(database=database): - profile = self.run_profiler(entity, profiler_source) - if hasattr(self, "sink") and profile: - self.sink.write_record(profile) - # At the end of the `execute`, update the associated Ingestion Pipeline status as success - self.update_ingestion_status_at_end() - - # Any unhandled exception breaking the workflow should update the status - except Exception as err: - self.set_ingestion_pipeline_status(PipelineState.failed) - raise err - # Force resource closing. Required for killing the threading - finally: - self.stop() - - def print_status(self) -> None: - """ - Print the workflow results with click - """ - print_profiler_status(self) - - def result_status(self) -> int: - """ - Returns 1 if status is failed, 0 otherwise. - """ - if self.source_status.failures or ( - hasattr(self, "sink") and self.sink.get_status().failures - ): - return 1 - return 0 - - def _get_source_success(self): - """Compue the success rate of the source""" - return self.source_status.calculate_success() - - def update_ingestion_status_at_end(self): - """ - Once the execute method is done, update the status - as OK or KO depending on the success rate. - """ - pipeline_state = PipelineState.success - if SUCCESS_THRESHOLD_VALUE <= self._get_source_success() < 100: - pipeline_state = PipelineState.partialSuccess - self.set_ingestion_pipeline_status(pipeline_state) - - def _raise_from_status_internal(self, raise_warnings=False): - """ - Check source, processor and sink status and raise if needed - - Our profiler source will never log any failure, only filters, - as we are just picking up data from OM. - """ - - if self._get_source_success() < SUCCESS_THRESHOLD_VALUE: - raise WorkflowExecutionError( - "Processor reported errors", self.source_status - ) - if hasattr(self, "sink") and self.sink.get_status().failures: - raise WorkflowExecutionError("Sink reported errors", self.sink.get_status()) - - if raise_warnings: - if self.source_status.warnings: - raise WorkflowExecutionError( - "Source reported warnings", self.source_status - ) - if hasattr(self, "sink") and self.sink.get_status().warnings: - raise WorkflowExecutionError( - "Sink reported warnings", self.sink.get_status() - ) - - def _validate_service_name(self): - """Validate service name exists in OpenMetadata""" - return self.metadata.get_by_name( - entity=DatabaseService, fqn=self.config.source.serviceName - ) - - def stop(self): - """ - Close all connections - """ - self.metadata.close() - self.timer.stop() - - def _retrieve_service_connection_if_needed(self) -> None: - """ - We override the current `serviceConnection` source config object if source workflow service already exists - in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it - from the service object itself through the default `SecretsManager`. - - :return: - """ - service_type: ServiceType = get_service_type_from_source_type( - self.config.source.type - ) - if ( - not self.config.source.serviceConnection - and not self.metadata.config.forceEntityOverwriting - ): - service_name = self.config.source.serviceName - try: - service: ServiceWithConnectionType = cast( - ServiceWithConnectionType, - self.metadata.get_by_name( - get_service_class_from_service_type(service_type), - service_name, - ), - ) - if not service: - raise ConnectionError( - f"Could not retrieve service with name `{service_name}`. " - "Typically caused by the `serviceName` does not exists in OpenMetadata " - "or the JWT Token is invalid." - ) - if service: - self.config.source.serviceConnection = ServiceConnection( - __root__=service.connection - ) - except ConnectionError as exc: - raise exc - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error getting service connection for service name [{service_name}]" - f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" - ) - - def test_connection(self): - service_config = self.config.source.serviceConnection.__root__.config - self.engine = get_connection(service_config) - - test_connection_fn = get_test_connection_fn(service_config) - test_connection_fn(self.metadata, self.engine, service_config) diff --git a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py index 5c1e8939701..9e4d9fb9b27 100644 --- a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py @@ -271,13 +271,13 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): name = f"{column if column is not None else table}" error = f"{name} metric_type.value: {exc}" logger.error(error) - self.processor_status.failed_profiler(error, traceback.format_exc()) + self.status.failed_profiler(error, traceback.format_exc()) row = None if column is not None: column = column.name - self.processor_status.scanned(f"{table.name.__root__}.{column}") + self.status.scanned(f"{table.name.__root__}.{column}") else: - self.processor_status.scanned(table.name.__root__) + self.status.scanned(table.name.__root__) return row, column, metric_type.value def fetch_sample_data(self, table) -> TableData: diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface.py b/ingestion/src/metadata/profiler/interface/profiler_interface.py index 5befd078bcc..fc050a04d0e 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface.py @@ -64,13 +64,13 @@ class ProfilerInterface(ABC): self.source_config = source_config self.service_connection_config = service_connection_config self.connection = get_connection(self.service_connection_config) - self.processor_status = ProfilerProcessorStatus() + self.status = ProfilerProcessorStatus() try: fqn = self.table_entity.fullyQualifiedName except AttributeError: - self.processor_status.entity = None + self.status.entity = None else: - self.processor_status.entity = fqn.__root__ if fqn else None + self.status.entity = fqn.__root__ if fqn else None self.profile_sample_config = profile_sample_config self.profile_query = sample_query self.partition_details = ( diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 18730ef5f0a..761266abe67 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -423,14 +423,14 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): except Exception as exc: error = f"{column if column is not None else runner.table.__tablename__} metric_type.value: {exc}" logger.error(error) - self.processor_status.failed_profiler(error, traceback.format_exc()) + self.status.failed_profiler(error, traceback.format_exc()) row = None if column is not None: column = column.name - self.processor_status.scanned(f"{table.__tablename__}.{column}") + self.status.scanned(f"{table.__tablename__}.{column}") else: - self.processor_status.scanned(table.__tablename__) + self.status.scanned(table.__tablename__) return row, column, metric_type.value diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index 8a8354c0242..87a6386b4a0 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -33,7 +33,6 @@ from metadata.generated.schema.entity.data.table import ( TableData, TableProfile, ) -from metadata.pii.processor import PIIProcessor from metadata.profiler.api.models import ProfilerResponse from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.metrics.core import ( @@ -445,7 +444,6 @@ class Profiler(Generic[TMetric]): def process( self, generate_sample_data: Optional[bool], - process_pii_sensitive: Optional[bool], ) -> ProfilerResponse: """ Given a table, we will prepare the profiler for @@ -462,11 +460,6 @@ class Profiler(Generic[TMetric]): else: sample_data = None - # If we also have sample data, we'll use the NER Scanner, - # otherwise we'll stick to the ColumnNameScanner - if process_pii_sensitive: - self.process_pii_sensitive(sample_data) - profile = self.get_profile() self._check_profile_and_handle(profile) @@ -495,28 +488,6 @@ class Profiler(Generic[TMetric]): logger.warning(f"Error fetching sample data: {err}") return None - def process_pii_sensitive(self, sample_data: TableData) -> None: - """Read sample data to find pii sensitive columns and tag them - as PII sensitive data - - Args: - sample_data (TableData): sample data - """ - try: - pii_processor = PIIProcessor( - metadata=self.profiler_interface.ometa_client # type: ignore - ) - pii_processor.process( - sample_data, - self.profiler_interface.table_entity, # type: ignore - self.profiler_interface.source_config.confidence, - ) - except Exception as exc: - logger.warning( - f"Unexpected error while processing sample data for auto pii tagging - {exc}" - ) - logger.debug(traceback.format_exc()) - def get_profile(self) -> CreateTableProfileRequest: """ After executing the profiler, get all results diff --git a/ingestion/src/metadata/profiler/processor/processor.py b/ingestion/src/metadata/profiler/processor/processor.py new file mode 100644 index 00000000000..f29af6ccf6d --- /dev/null +++ b/ingestion/src/metadata/profiler/processor/processor.py @@ -0,0 +1,89 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Profiler Processor Step +""" +import traceback +from typing import cast + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Processor +from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse +from metadata.profiler.processor.core import Profiler +from metadata.profiler.source.metadata import ProfilerSourceAndEntity + + +class ProfilerProcessor(Processor): + """ + This processor is in charge of getting the profiler source and entity coming from + the OpenMetadataSource and compute the metrics. + """ + + def __init__(self, config: OpenMetadataWorkflowConfig): + + super().__init__() + + self.config = config + self.profiler_config = ProfilerProcessorConfig.parse_obj( + self.config.processor.dict().get("config") + ) + self.source_config: DatabaseServiceProfilerPipeline = cast( + DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config + ) # Used to satisfy type checked + + def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]: + + profiler_runner: Profiler = record.profiler_source.get_profiler_runner( + record.entity, self.profiler_config + ) + + try: + profile: ProfilerResponse = profiler_runner.process( + self.source_config.generateSampleData, + ) + except Exception as exc: + self.status.failed( + StackTraceError( + name=record.entity.fullyQualifiedName.__root__, + error=f"Unexpected exception processing entity {record.entity.fullyQualifiedName.__root__}: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + self.status.failures.extend( + record.profiler_source.interface.status.failures + ) + else: + # at this point we know we have an interface variable since we the `try` block above didn't raise + self.status.failures.extend(record.profiler_source.interface.status.failures) # type: ignore + return Either(right=profile) + finally: + profiler_runner.close() + + return Either() + + @classmethod + def create(cls, config_dict: dict, _: OpenMetadataConnection) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config) + + def close(self) -> None: + """We are already closing the connections after each execution""" diff --git a/ingestion/src/metadata/profiler/sink/README.md b/ingestion/src/metadata/profiler/sink/README.md deleted file mode 100644 index 1ac4ce556d6..00000000000 --- a/ingestion/src/metadata/profiler/sink/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# ORM Profiler Sink - -Host all the Sinks to push Profile data and validations to. - -We will provide OpenMetadata REST Sink by default. diff --git a/ingestion/src/metadata/profiler/sink/file.py b/ingestion/src/metadata/profiler/sink/file.py deleted file mode 100644 index ace3dd7fe25..00000000000 --- a/ingestion/src/metadata/profiler/sink/file.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Profiler File Sink -""" -from pathlib import Path - -from metadata.config.common import ConfigModel -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.sink import Sink -from metadata.profiler.api.models import ProfilerResponse -from metadata.utils.logger import profiler_logger - -logger = profiler_logger() - - -class FileSinkConfig(ConfigModel): - filename: str - - -class FileSink(Sink[Entity]): - """ - Helper sink to save profiler - results in a file for analysis - """ - - config: FileSinkConfig - - def __init__( - self, - config: FileSinkConfig, - ): - super().__init__() - self.config = config - - fpath = Path(self.config.filename) - - # Build the path if it does not exist - if not fpath.parent.is_dir(): - Path(self.config.filename).mkdir(parents=True, exist_ok=True) - # pylint: disable=consider-using-with - self.file = fpath.open("w", encoding="utf-8") - self.wrote_something = False - - @classmethod - def create(cls, config_dict: dict, _): - config = FileSinkConfig.parse_obj(config_dict) - return cls(config) - - def write_record(self, record: ProfilerResponse) -> None: - if self.wrote_something: - self.file.write("\n") - - self.file.write(f"Profile for: {record.table.fullyQualifiedName.__root__}\n") - self.file.write(f"{record.profile.json()}\n") - - self.wrote_something = True - self.status.records_written(record.table.fullyQualifiedName.__root__) - - def close(self): - self.file.write("\n]") - self.file.close() diff --git a/ingestion/src/metadata/profiler/sink/metadata_rest.py b/ingestion/src/metadata/profiler/sink/metadata_rest.py deleted file mode 100644 index 0d1e60eebbf..00000000000 --- a/ingestion/src/metadata/profiler/sink/metadata_rest.py +++ /dev/null @@ -1,126 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -OpenMetadata REST Sink implementation for the ORM Profiler results -""" -import traceback -from typing import Optional - -from metadata.config.common import ConfigModel -from metadata.generated.schema.api.data.createTableProfile import ( - CreateTableProfileRequest, -) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.sink import Sink -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.models import ProfilerResponse -from metadata.utils.logger import profiler_logger - -logger = profiler_logger() - - -class MetadataRestSinkConfig(ConfigModel): - api_endpoint: Optional[str] = None - - -class MetadataRestSink(Sink[Entity]): - """ - Metadata Sink sending the profiler - and tests results to the OM API - """ - - config: MetadataRestSinkConfig - - def __init__( - self, - config: MetadataRestSinkConfig, - metadata_config: OpenMetadataConnection, - ): - super().__init__() - self.config = config - self.metadata_config = metadata_config - self.wrote_something = False - self.metadata = OpenMetadata(self.metadata_config) - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = MetadataRestSinkConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - @staticmethod - def clean_up_profile_columns( - profile: CreateTableProfileRequest, - ) -> CreateTableProfileRequest: - """clean up "`" character used for BQ array - - Args: - profile (CreateTableProfileRequest): profiler request - - Returns: - CreateTableProfileRequest: profiler request modified - """ - column_profile = profile.columnProfile - for column in column_profile: - column.name = column.name.replace("`", "") - - profile.columnProfile = column_profile - return profile - - def close(self) -> None: - self.metadata.close() - - def write_record(self, record: ProfilerResponse) -> None: - log = f"{type(record.table).__name__} [{record.table.name.__root__}]" - try: - self.metadata.ingest_profile_data( - table=record.table, - profile_request=self.clean_up_profile_columns(record.profile), - ) - logger.debug( - f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.__root__}" - ) - - if record.sample_data: - self.metadata.ingest_table_sample_data( - table=record.table, sample_data=record.sample_data - ) - logger.debug( - f"Successfully ingested sample data for {record.table.fullyQualifiedName.__root__}" - ) - self.status.records_written( - f"Table: {record.table.fullyQualifiedName.__root__}" - ) - - except APIError as err: - name = record.table.fullyQualifiedName.__root__ - error = f"Failed to sink profiler & test data for {name}: {err}" - logger.debug(traceback.format_exc()) - logger.warning(error) - self.status.failed( - StackTraceError( - name=name, error=error, stack_trace=traceback.format_exc() - ) - ) - except Exception as exc: - error = f"Failed to ingest {log}: {exc}" - logger.debug(traceback.format_exc()) - logger.warning(error) - self.status.failed( - StackTraceError( - name=log, error=error, stack_trace=traceback.format_exc() - ) - ) diff --git a/ingestion/src/metadata/profiler/source/metadata.py b/ingestion/src/metadata/profiler/source/metadata.py new file mode 100644 index 00000000000..edb4655b384 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/metadata.py @@ -0,0 +1,257 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +OpenMetadata source for the profiler +""" +import traceback +from typing import Iterable, Optional, cast + +from pydantic import BaseModel + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table, TableType +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Source +from metadata.ingestion.ometa.client_utils import create_ometa_client +from metadata.profiler.source.base.profiler_source import ProfilerSource +from metadata.profiler.source.profiler_source_factory import profiler_source_factory +from metadata.utils import fqn +from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class ProfilerSourceAndEntity(BaseModel): + """Return class for the OpenMetadata Profiler Source""" + + class Config: + arbitrary_types_allowed = True + extra = "forbid" + + profiler_source: ProfilerSource + entity: Table + + +class OpenMetadataSource(Source): + """ + This source lists and filters the entities that need + to be processed by the profiler workflow. + + Note that in order to manage the following steps we need + to test the connection against the Database Service Source. + We do this here as well. + """ + + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata_config: OpenMetadataConnection, + ): + + super().__init__() + + self.config = config + self.metadata_config = metadata_config + self.metadata = create_ometa_client(self.metadata_config) + self.test_connection() + + # Init and type the source config + self.source_config: DatabaseServiceProfilerPipeline = cast( + DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config + ) # Used to satisfy type checked + + if not self._validate_service_name(): + raise ValueError( + f"Service name `{self.config.source.serviceName}` does not exist. " + "Make sure you have run the ingestion for the service specified in the profiler workflow. " + "If so, make sure the profiler service name matches the service name specified during ingestion " + "and that your ingestion token (settings > bots) is still valid." + ) + + logger.info( + f"Starting profiler for service {self.config.source.serviceName}" + f":{self.config.source.type.lower()}" + ) + + def _validate_service_name(self): + """Validate service name exists in OpenMetadata""" + return self.metadata.get_by_name( + entity=DatabaseService, fqn=self.config.source.serviceName + ) + + def prepare(self): + """Nothing to prepare""" + + def test_connection(self) -> None: + """ + Our source is the ometa client. Validate the + health check before moving forward + """ + self.metadata.health_check() + + def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: + + for database in self.get_database_entities(): + try: + profiler_source = profiler_source_factory.create( + self.config.source.type.lower(), + self.config, + database, + self.metadata, + ) + for entity in self.get_table_entities(database=database): + yield Either( + right=ProfilerSourceAndEntity( + profiler_source=profiler_source, + entity=entity, + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=database.fullyQualifiedName.__root__, + error=f"Error listing source and entities for database due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + @classmethod + def create( + cls, config_dict: dict, metadata_config: OpenMetadataConnection + ) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config, metadata_config=metadata_config) + + def filter_databases(self, database: Database) -> Optional[Database]: + """Returns filtered database entities""" + if filter_by_database( + self.source_config.databaseFilterPattern, + database.name.__root__, + ): + self.status.filter(database.name.__root__, "Database pattern not allowed") + return None + return database + + def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]: + """ + From a list of tables, apply the SQLSourceConfig + filter patterns. + + We will update the status on the SQLSource Status. + """ + for table in tables: + try: + if filter_by_schema( + self.source_config.schemaFilterPattern, + table.databaseSchema.name, # type: ignore + ): + self.status.filter( + f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", + "Schema pattern not allowed", + ) + continue + if filter_by_table( + self.source_config.tableFilterPattern, + table.name.__root__, + ): + self.status.filter( + f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", + "Table pattern not allowed", + ) + continue + if ( + table.tableType == TableType.View + and not self.source_config.includeViews + ): + self.status.filter( + table.fullyQualifiedName.__root__, + "View filtered out", + ) + continue + yield table + except Exception as exc: + self.status.failed( + StackTraceError( + name=table.fullyQualifiedName.__root__, + error=f"Unexpected error filtering entities for table [{table}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + + def get_database_entities(self): + """List all databases in service""" + + databases = [ + self.filter_databases(database) + for database in self.metadata.list_all_entities( + entity=Database, + params={"service": self.config.source.serviceName}, + ) + if self.filter_databases(database) + ] + + if not databases: + raise ValueError( + "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." + f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long + f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long + ) + + return databases + + def get_table_entities(self, database): + """ + List and filter OpenMetadata tables based on the + source configuration. + + The listing will be based on the entities from the + informed service name in the source configuration. + + Note that users can specify `table_filter_pattern` to + either be `includes` or `excludes`. This means + that we will either what is specified in `includes` + or we will use everything but the tables excluded. + + Same with `schema_filter_pattern`. + """ + tables = self.metadata.list_all_entities( + entity=Table, + fields=[ + "tableProfilerConfig", + ], + params={ + "service": self.config.source.serviceName, + "database": fqn.build( + self.metadata, + entity_type=Database, + service_name=self.config.source.serviceName, + database_name=database.name.__root__, + ), + }, # type: ignore + ) + + yield from self.filter_entities(tables) + + def close(self) -> None: + """Nothing to close""" diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 9c57e21a340..b553c1150d7 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -100,7 +100,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): self.set_ingestion_pipeline_status(state=PipelineState.running) - # Pick up the service connection from the API if neededweb_analytic_report_data_processor + # Pick up the service connection from the API if needed self._retrieve_service_connection_if_needed(self.service_type) # Informs the `source` and the rest of `steps` to execute diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py new file mode 100644 index 00000000000..344384d8bc5 --- /dev/null +++ b/ingestion/src/metadata/workflow/profiler.py @@ -0,0 +1,72 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Workflow definition for the profiler +""" +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.steps import Processor, Sink +from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.pii.processor import PIIProcessor +from metadata.profiler.processor.processor import ProfilerProcessor +from metadata.profiler.source.metadata import OpenMetadataSource +from metadata.utils.importer import import_sink_class +from metadata.utils.logger import profiler_logger +from metadata.workflow.base import BaseWorkflow + +logger = profiler_logger() + + +class ProfilerWorkflow(BaseWorkflow): + """ + Profiler ingestion workflow implementation + + We check the source connection test when initializing + this workflow. No need to do anything here if this does not pass + """ + + def __init__(self, config: OpenMetadataWorkflowConfig): + super().__init__(config) + + self.test_connection() + + def set_steps(self): + self.source = OpenMetadataSource.create( + self.config.dict(), self.metadata_config + ) + + profiler_processor = self._get_profiler_processor() + pii_processor = self._get_pii_processor() + sink = self._get_sink() + self.steps = (profiler_processor, pii_processor, sink) + + def test_connection(self): + service_config = self.config.source.serviceConnection.__root__.config + conn = get_connection(service_config) + + test_connection_fn = get_test_connection_fn(service_config) + test_connection_fn(self.metadata, conn, service_config) + + def _get_sink(self) -> Sink: + sink_type = self.config.sink.type + sink_class = import_sink_class(sink_type=sink_type) + sink_config = self.config.sink.dict().get("config", {}) + sink: Sink = sink_class.create(sink_config, self.metadata_config) + logger.debug(f"Sink type:{self.config.sink.type}, {sink_class} configured") + + return sink + + def _get_profiler_processor(self) -> Processor: + return ProfilerProcessor.create(self.config.dict(), self.metadata_config) + + def _get_pii_processor(self) -> Processor: + return PIIProcessor.create(self.config.dict(), self.metadata_config) diff --git a/ingestion/src/metadata/workflow/workflow_output_handler.py b/ingestion/src/metadata/workflow/workflow_output_handler.py index f903e5103e6..7708575b5eb 100644 --- a/ingestion/src/metadata/workflow/workflow_output_handler.py +++ b/ingestion/src/metadata/workflow/workflow_output_handler.py @@ -230,41 +230,6 @@ def print_status(workflow: "BaseWorkflow") -> None: ) -def print_profiler_status(workflow) -> None: - """ - Print the profiler workflow results - """ - print_workflow_summary_legacy( - workflow, - source=True, - processor=True, - source_status=workflow.source_status, - ) - - if workflow.source_status.source_start_time: - log_ansi_encoded_string( - color=ANSI.BRIGHT_CYAN, - bold=True, - message="Workflow finished in time: " - f"{pretty_print_time_duration(time.time()-workflow.source_status.source_start_time)}", - ) - - if workflow.result_status() == 1: - log_ansi_encoded_string( - color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE - ) - elif workflow.source_status.warnings or ( - hasattr(workflow, "sink") and workflow.sink.get_status().warnings - ): - log_ansi_encoded_string( - color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE - ) - else: - log_ansi_encoded_string( - color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE - ) - - def print_test_suite_status(workflow) -> None: """ Print the test suite workflow results diff --git a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py index 8b0d643213f..99e08b31f62 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_bigquery_system_metrics.py @@ -55,12 +55,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, ) from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent @@ -166,7 +166,7 @@ class TestBigquerySystem(TestCase): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py index 7303f08bc6f..95e743cc37f 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_redshift_system_metrics.py @@ -41,12 +41,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, ) from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent @@ -133,7 +133,7 @@ class TestRedshiftSystem(TestCase): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py index e34848df900..278475156e6 100644 --- a/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py +++ b/ingestion/tests/integration/orm_profiler/system/test_snowflake_system_metrics.py @@ -60,12 +60,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, ) from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import print_status TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent @@ -157,7 +157,7 @@ class TestSnowflakeystem(TestCase): profiler_workflow = ProfilerWorkflow.create(config) profiler_workflow.execute() profiler_workflow.raise_from_status() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() # get latest profile metrics diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py index 956c39a4533..83a5257637c 100644 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py @@ -34,12 +34,12 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor OpenMetadataJWTClientConfig, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, ) from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import print_status BUCKET_NAME = "MyBucket" diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index 558afaeb18f..7514fc48185 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -21,7 +21,6 @@ from copy import deepcopy from datetime import datetime, timedelta from unittest import TestCase -import pytest from sqlalchemy import Column, DateTime, Integer, String, create_engine from sqlalchemy.orm import declarative_base @@ -35,8 +34,8 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor ) from metadata.ingestion.connections.session import create_and_bind_session from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import print_status logging.basicConfig(level=logging.WARN) @@ -303,7 +302,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( @@ -349,7 +348,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( @@ -388,7 +387,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( @@ -436,7 +435,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( @@ -476,7 +475,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( @@ -523,7 +522,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( @@ -563,7 +562,7 @@ class ProfilerWorkflowTest(TestCase): profiler_workflow = ProfilerWorkflow.create(workflow_config) profiler_workflow.execute() - profiler_workflow.print_status() + print_status(profiler_workflow) profiler_workflow.stop() table = self.metadata.get_by_name( diff --git a/ingestion/tests/integration/orm_profiler/test_pii_processor.py b/ingestion/tests/integration/orm_profiler/test_pii_processor.py index 5067e158594..48396eb0a82 100644 --- a/ingestion/tests/integration/orm_profiler/test_pii_processor.py +++ b/ingestion/tests/integration/orm_profiler/test_pii_processor.py @@ -20,6 +20,9 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.data.createTableProfile import ( + CreateTableProfileRequest, +) from metadata.generated.schema.api.services.createDatabaseService import ( CreateDatabaseServiceRequest, ) @@ -29,6 +32,7 @@ from metadata.generated.schema.entity.data.table import ( DataType, Table, TableData, + TableProfile, ) from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( BasicAuth, @@ -44,12 +48,23 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, + Source, + SourceConfig, + WorkflowConfig, +) from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( OpenMetadataJWTClientConfig, ) +from metadata.generated.schema.type.basic import Timestamp from metadata.generated.schema.type.tagLabel import TagFQN, TagLabel from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.pii.processor import PIIProcessor +from metadata.profiler.api.models import PatchColumnTagResponse, ProfilerResponse table_data = TableData( columns=[ @@ -100,158 +115,50 @@ table_data = TableData( ) -EXPECTED_TABLE_ENTITY = [ - Column( - name=ColumnName(__root__="customer_id"), - displayName=None, - dataType="INT", - arrayDataType=None, - dataLength=None, - precision=None, - scale=None, - dataTypeDisplay="int", - description=None, - fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.customer_id", - tags=[], - constraint=None, - ordinalPosition=None, - jsonSchema=None, - children=None, - customMetrics=None, - profile=None, +EXPECTED_COLUMN_TAGS = [ + PatchColumnTagResponse( + column_fqn="test-service-table-patch.test-db.test-schema.customers.first_name", + tag_label=TagLabel( + tagFQN=TagFQN(__root__="PII.Sensitive"), + description=( + "PII which if lost, compromised, or disclosed without authorization, could result in" + " substantial harm, embarrassment, inconvenience, or unfairness to an individual." + ), + source="Classification", + labelType="Automated", + state="Suggested", + href=None, + ), ), - Column( - name=ColumnName(__root__="first_name"), - displayName=None, - dataType="VARCHAR", - arrayDataType=None, - dataLength=20, - precision=None, - scale=None, - dataTypeDisplay="varchar", - description=None, - fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.first_name", - tags=[ - TagLabel( - tagFQN=TagFQN(__root__="PII.Sensitive"), - description=( + PatchColumnTagResponse( + column_fqn="test-service-table-patch.test-db.test-schema.customers.first_order", + tag_label=TagLabel( + tagFQN=TagFQN(__root__="PII.NonSensitive"), + description=( + "PII which is easily accessible from public sources and can include zip code, " + "race, gender, and date of birth." + ), + source="Classification", + labelType="Automated", + state="Suggested", + href=None, + ), + ), + PatchColumnTagResponse( + column_fqn="test-service-table-patch.test-db.test-schema.customers.customer_email", + tag_label=TagLabel( + tagFQN=TagFQN(__root__="PII.Sensitive"), + description=( + ( "PII which if lost, compromised, or disclosed without authorization, could result in" " substantial harm, embarrassment, inconvenience, or unfairness to an individual." - ), - source="Classification", - labelType="Automated", - state="Suggested", - href=None, - ) - ], - constraint=None, - ordinalPosition=None, - jsonSchema=None, - children=None, - customMetrics=None, - profile=None, - ), - Column( - name=ColumnName(__root__="last_name"), - displayName=None, - dataType="VARCHAR", - arrayDataType=None, - dataLength=20, - precision=None, - scale=None, - dataTypeDisplay="varchar", - description=None, - fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.last_name", - tags=[], - constraint=None, - ordinalPosition=None, - jsonSchema=None, - children=None, - customMetrics=None, - profile=None, - ), - Column( - name=ColumnName(__root__="first_order"), - displayName=None, - dataType="DATE", - arrayDataType=None, - dataLength=None, - precision=None, - scale=None, - dataTypeDisplay="date", - description=None, - fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.first_order", - tags=[ - TagLabel( - tagFQN=TagFQN(__root__="PII.NonSensitive"), - description=( - "PII which is easily accessible from public sources and can include zip code, " - "race, gender, and date of birth." - ), - source="Classification", - labelType="Automated", - state="Suggested", - href=None, - ) - ], - constraint=None, - ordinalPosition=None, - jsonSchema=None, - children=None, - customMetrics=None, - profile=None, - ), - Column( - name=ColumnName(__root__="customer_email"), - displayName=None, - dataType="VARCHAR", - arrayDataType=None, - dataLength=20, - precision=None, - scale=None, - dataTypeDisplay="date", - description=None, - fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.customer_email", - tags=[ - TagLabel( - tagFQN=TagFQN(__root__="PII.Sensitive"), - description=( - ( - "PII which if lost, compromised, or disclosed without authorization, could result in" - " substantial harm, embarrassment, inconvenience, or unfairness to an individual." - ) - ), - source="Classification", - labelType="Automated", - state="Suggested", - href=None, - ) - ], - constraint=None, - ordinalPosition=None, - jsonSchema=None, - children=None, - customMetrics=None, - profile=None, - ), - Column( - name=ColumnName(__root__="number_of_orders"), - displayName=None, - dataType="BIGINT", - arrayDataType=None, - dataLength=None, - precision=None, - scale=None, - dataTypeDisplay="bigint", - description=None, - fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.number_of_orders", - tags=[], - constraint=None, - ordinalPosition=None, - jsonSchema=None, - children=None, - customMetrics=None, - profile=None, + ) + ), + source="Classification", + labelType="Automated", + state="Suggested", + href=None, + ), ), ] @@ -275,8 +182,91 @@ class PiiProcessorTest(TestCase): "5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" ), ) + + workflow_config = OpenMetadataWorkflowConfig( + source=Source( + type="mysql", + serviceName="test", + sourceConfig=SourceConfig( + config=DatabaseServiceProfilerPipeline( + confidence=85, + processPiiSensitive=True, + ) + ), + ), + workflowConfig=WorkflowConfig(openMetadataServerConfig=server_config), + ) + metadata = OpenMetadata(server_config) - ner_scanner_processor = PIIProcessor(metadata) + ner_scanner_processor = PIIProcessor( + config=workflow_config, metadata_config=server_config + ) + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients + """ + service = CreateDatabaseServiceRequest( + name="test-service-table-patch", + serviceType=DatabaseServiceType.Mysql, + connection=DatabaseConnection( + config=MysqlConnection( + username="username", + authType=BasicAuth( + password="password", + ), + hostPort="http://localhost:1234", + ) + ), + ) + service_entity = cls.metadata.create_or_update(data=service) + + create_db = CreateDatabaseRequest( + name="test-db", + service=service_entity.fullyQualifiedName, + ) + + create_db_entity = cls.metadata.create_or_update(data=create_db) + + create_schema = CreateDatabaseSchemaRequest( + name="test-schema", + database=create_db_entity.fullyQualifiedName, + ) + + create_schema_entity = cls.metadata.create_or_update(data=create_schema) + + created_table = CreateTableRequest( + name="customers", + columns=[ + Column(name="customer_id", dataType=DataType.INT), + Column(name="first_name", dataType=DataType.VARCHAR, dataLength=20), + Column(name="last_name", dataType=DataType.VARCHAR, dataLength=20), + Column(name="first_order", dataType=DataType.DATE), + Column(name="customer_email", dataType=DataType.VARCHAR, dataLength=20), + Column(name="number_of_orders", dataType=DataType.BIGINT), + ], + databaseSchema=create_schema_entity.fullyQualifiedName, + ) + cls.table_entity = cls.metadata.create_or_update(data=created_table) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + service_id = str( + cls.metadata.get_by_name( + entity=DatabaseService, fqn="test-service-table-patch" + ).id.__root__ + ) + + cls.metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) @classmethod def setUpClass(cls) -> None: @@ -349,15 +339,22 @@ class PiiProcessorTest(TestCase): test function for ner Scanner """ - self.ner_scanner_processor.process( - table_data=table_data, - table_entity=self.table_entity, - confidence_threshold=85, - ) - updated_table_entity = self.metadata.get_by_id( - entity=Table, entity_id=self.table_entity.id, fields=["tags"] + record = ProfilerResponse( + table=self.table_entity, + profile=CreateTableProfileRequest( + tableProfile=TableProfile( + timestamp=Timestamp( + __root__=int(datetime.datetime.now().timestamp() * 1000) + ) + ) + ), + sample_data=table_data, ) + + updated_record: ProfilerResponse = self.ner_scanner_processor.run(record) + for _, (expected, original) in enumerate( - zip(EXPECTED_TABLE_ENTITY, updated_table_entity.columns) + zip(EXPECTED_COLUMN_TAGS, updated_record.column_tags) ): - self.assertEqual(expected.tags, original.tags) + self.assertEqual(expected.column_fqn, original.column_fqn) + self.assertEqual(expected.tag_label.tagFQN, original.tag_label.tagFQN) diff --git a/ingestion/tests/unit/pii/test_column_name_scanner.py b/ingestion/tests/unit/pii/test_column_name_scanner.py index f870b2d5173..470076636d2 100644 --- a/ingestion/tests/unit/pii/test_column_name_scanner.py +++ b/ingestion/tests/unit/pii/test_column_name_scanner.py @@ -13,8 +13,8 @@ Test Column Name Scanner """ from unittest import TestCase -from metadata.pii.column_name_scanner import ColumnNameScanner from metadata.pii.models import TagAndConfidence, TagType +from metadata.pii.scanners.column_name_scanner import ColumnNameScanner EXPECTED_SENSITIVE = TagAndConfidence( tag=TagType.SENSITIVE, diff --git a/ingestion/tests/unit/pii/test_ner_scanner.py b/ingestion/tests/unit/pii/test_ner_scanner.py index 5e31cfba8c9..d9f491d42a8 100644 --- a/ingestion/tests/unit/pii/test_ner_scanner.py +++ b/ingestion/tests/unit/pii/test_ner_scanner.py @@ -14,7 +14,7 @@ Test Column Name Scanner from unittest import TestCase from metadata.pii.models import TagType -from metadata.pii.ner_scanner import NERScanner +from metadata.pii.scanners.ner_scanner import NERScanner class NERScannerTest(TestCase): diff --git a/ingestion/tests/unit/profiler/test_profiler_partitions.py b/ingestion/tests/unit/profiler/test_profiler_partitions.py index 84fd947cac1..1cdd69bb7d4 100644 --- a/ingestion/tests/unit/profiler/test_profiler_partitions.py +++ b/ingestion/tests/unit/profiler/test_profiler_partitions.py @@ -30,8 +30,8 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.profiler.interface.profiler_interface import ProfilerInterface +from metadata.workflow.profiler import ProfilerWorkflow """ Check Partitioned Table in Profiler Workflow @@ -117,7 +117,9 @@ MOCK_RANGE_PARTITIONING = RangePartitioning( class ProfilerPartitionUnitTest(TestCase): - @patch("metadata.profiler.api.workflow.ProfilerWorkflow._validate_service_name") + @patch( + "metadata.profiler.source.metadata.OpenMetadataSource._validate_service_name" + ) @patch("google.auth.default") @patch("sqlalchemy.engine.base.Engine.connect") @patch("sqlalchemy_bigquery._helpers.create_bigquery_client") diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index a4a837371d0..f03cd07d93a 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -38,12 +38,13 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.profiler.api.models import ProfilerProcessorConfig -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.processor.default import DefaultProfiler from metadata.profiler.source.base.profiler_source import ProfilerSource +from metadata.profiler.source.metadata import OpenMetadataSource +from metadata.workflow.profiler import ProfilerWorkflow TABLE = Table( id=uuid.uuid4(), @@ -109,7 +110,7 @@ class User(Base): return_value=User, ) @patch.object( - ProfilerWorkflow, + OpenMetadataSource, "_validate_service_name", return_value=True, ) @@ -120,16 +121,18 @@ def test_init_workflow(mocked_method, mocked_orm): # pylint: disable=unused-arg workflow = ProfilerWorkflow.create(config) mocked_method.assert_called() - assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline) + assert isinstance(workflow.source.source_config, DatabaseServiceProfilerPipeline) assert isinstance(workflow.metadata_config, OpenMetadataConnection) - assert isinstance(workflow.profiler_config, ProfilerProcessorConfig) - assert workflow.profiler_config.profiler is None - assert workflow.profiler_config.tableConfig is None + profiler_processor_step = workflow.steps[0] + assert isinstance(profiler_processor_step.profiler_config, ProfilerProcessorConfig) + + assert profiler_processor_step.profiler_config.profiler is None + assert profiler_processor_step.profiler_config.tableConfig is None @patch.object( - ProfilerWorkflow, + OpenMetadataSource, "_validate_service_name", return_value=True, ) @@ -174,7 +177,7 @@ def test_filter_entities(mocked_method): ] # Simple workflow does not filter - assert len(list(workflow.filter_entities(all_tables))) == 3 + assert len(list(workflow.source.filter_entities(all_tables))) == 3 # We can exclude based on the schema name exclude_config = deepcopy(config) @@ -184,7 +187,7 @@ def test_filter_entities(mocked_method): exclude_workflow = ProfilerWorkflow.create(exclude_config) mocked_method.assert_called() - assert len(list(exclude_workflow.filter_entities(all_tables))) == 2 + assert len(list(exclude_workflow.source.filter_entities(all_tables))) == 2 exclude_config = deepcopy(config) exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { @@ -193,7 +196,7 @@ def test_filter_entities(mocked_method): exclude_workflow = ProfilerWorkflow.create(exclude_config) mocked_method.assert_called() - assert len(list(exclude_workflow.filter_entities(all_tables))) == 2 + assert len(list(exclude_workflow.source.filter_entities(all_tables))) == 2 include_config = deepcopy(config) include_config["source"]["sourceConfig"]["config"]["databaseFilterPattern"] = { @@ -202,7 +205,7 @@ def test_filter_entities(mocked_method): include_workflow = ProfilerWorkflow.create(include_config) mocked_method.assert_called() - assert len(list(include_workflow.filter_entities(all_tables))) == 3 + assert len(list(include_workflow.source.filter_entities(all_tables))) == 3 @patch.object( @@ -211,7 +214,7 @@ def test_filter_entities(mocked_method): return_value=User, ) @patch.object( - ProfilerWorkflow, + OpenMetadataSource, "_validate_service_name", return_value=True, ) @@ -230,6 +233,8 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum profile_workflow = ProfilerWorkflow.create(profile_config) mocked_method.assert_called() + profiler_processor_step = profile_workflow.steps[0] + profiler_source = ProfilerSource( profile_workflow.config, DatabaseService( @@ -240,13 +245,13 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum profile_workflow.metadata, ) profiler_runner = profiler_source.get_profiler_runner( - TABLE, profile_workflow.profiler_config + TABLE, profiler_processor_step.profiler_config ) # profile_workflow.create_profiler(TABLE, profiler_interface) profiler_obj_metrics = [metric.name() for metric in profiler_runner.metrics] - assert profile_workflow.profiler_config.profiler + assert profiler_processor_step.profiler_config.profiler assert config_metrics_label == profiler_obj_metrics @@ -256,7 +261,7 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum return_value=User, ) @patch.object( - ProfilerWorkflow, + OpenMetadataSource, "_validate_service_name", return_value=True, ) @@ -271,6 +276,8 @@ def test_default_profile_def( profile_workflow = ProfilerWorkflow.create(config) mocked_method.assert_called() + profiler_processor_step = profile_workflow.steps[0] + profiler_source = ProfilerSource( profile_workflow.config, DatabaseService( @@ -281,7 +288,7 @@ def test_default_profile_def( profile_workflow.metadata, ) profiler_runner = profiler_source.get_profiler_runner( - TABLE, profile_workflow.profiler_config + TABLE, profiler_processor_step.profiler_config ) assert isinstance( diff --git a/ingestion/tests/unit/test_importer.py b/ingestion/tests/unit/test_importer.py index b5d5144efea..522d8aa5a73 100644 --- a/ingestion/tests/unit/test_importer.py +++ b/ingestion/tests/unit/test_importer.py @@ -114,14 +114,6 @@ class ImporterTest(TestCase): MetadataUsageBulkSink, ) - def test_import_sink_from(self) -> None: - from metadata.profiler.sink.metadata_rest import MetadataRestSink - - self.assertEqual( - import_sink_class(sink_type="metadata-rest", from_="profiler"), - MetadataRestSink, - ) - def test_import_get_connection(self) -> None: connection = MysqlConnection( username="name", diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index afdd49903df..e9651cf627c 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -28,8 +28,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( WorkflowConfig, ) from metadata.ingestion.models.encoders import show_secrets_encoder -from metadata.profiler.api.workflow import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import print_profiler_status +from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import print_status def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -49,7 +49,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): workflow.execute() workflow.raise_from_status() - print_profiler_status(workflow) + print_status(workflow) workflow.stop() diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 4407cdd3498..4fe0e10d390 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -71,8 +71,8 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow def mock_set_ingestion_pipeline_status(self, state):