Part of #11968 - Restructure Profiler Workflow and PII Processor (#13059)

* Structure PII

* Restructure Profiler Workflow

* Update signature for abc

* remove profiler sink

* Fix tests

* Fix lint

* Fix test

* Fix test
This commit is contained in:
Pere Miquel Brull 2023-09-04 11:02:57 +02:00 committed by GitHub
parent 2e608987f7
commit a3bfd4e696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 901 additions and 1064 deletions

View File

@ -21,9 +21,9 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
PipelineType,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.logger import set_loggers_level
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_output_handler import print_status

View File

@ -17,12 +17,12 @@ import sys
import traceback
from metadata.config.common import load_config_file
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.logger import cli_logger
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import (
WorkflowType,
print_init_error,
print_profiler_status,
print_status,
)
logger = cli_logger()
@ -48,5 +48,5 @@ def run_profiler(config_path: str) -> None:
workflow.execute()
workflow.stop()
print_profiler_status(workflow)
print_status(workflow)
workflow.raise_from_status()

View File

@ -61,17 +61,17 @@ class ReturnStep(Step, ABC):
"""Steps that run by returning a single unit"""
@abstractmethod
def _run(self, *args, **kwargs) -> Either:
def _run(self, record: Entity) -> Either:
"""
Main entrypoint to execute the step
"""
def run(self, *args, **kwargs) -> Optional[Entity]:
def run(self, record: Entity) -> Optional[Entity]:
"""
Run the step and handle the status and exceptions
"""
try:
result: Either = self._run(*args, **kwargs)
result: Either = self._run(record)
if result.left is not None:
self.status.failed(result.left)
return None
@ -98,7 +98,7 @@ class StageStep(Step, ABC):
"""Steps that run by returning a single unit"""
@abstractmethod
def _run(self, *args, **kwargs) -> Iterable[Either[str]]:
def _run(self, record: Entity) -> Iterable[Either[str]]:
"""
Main entrypoint to execute the step.
@ -110,12 +110,12 @@ class StageStep(Step, ABC):
pick up the file components.
"""
def run(self, *args, **kwargs) -> None:
def run(self, record: Entity) -> None:
"""
Run the step and handle the status and exceptions.
"""
try:
for result in self._run(*args, **kwargs):
for result in self._run(record):
if result.left is not None:
self.status.failed(result.left)
@ -138,7 +138,7 @@ class IterStep(Step, ABC):
"""Steps that are run as Iterables"""
@abstractmethod
def _iter(self, *args, **kwargs) -> Iterable[Either]:
def _iter(self) -> Iterable[Either]:
"""Main entrypoint to run through the Iterator"""
def run(self) -> Iterable[Optional[Entity]]:

View File

@ -52,9 +52,7 @@ class Sink(ReturnStep, ABC):
"""All Sinks must inherit this base class."""
@abstractmethod
def _run( # pylint: disable=arguments-differ
self, record: Entity, *_, **__
) -> Either:
def _run(self, record: Entity) -> Either:
"""
Send the data somewhere, e.g., the OM API
"""
@ -64,7 +62,7 @@ class Processor(ReturnStep, ABC):
"""All Processor must inherit this base class"""
@abstractmethod
def _run(self, *args, **kwargs) -> Either:
def _run(self, record: Entity) -> Either:
"""
Post process a given entity and return it
or a new one

View File

@ -138,7 +138,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to PATCH description for {entity.__class__.__name__} [{source.id}]: {exc}"
f"Error trying to PATCH {entity.__name__} [{source.id.__root__}]: {exc}"
)
return None

View File

@ -89,15 +89,13 @@ class QueryParserProcessor(Processor):
connection_type = kwargs.pop("connection_type", "")
return cls(config, metadata_config, connection_type)
def _run( # pylint: disable=arguments-differ
self, queries: TableQueries
) -> Optional[Either[QueryParserData]]:
if queries and queries.queries:
def _run(self, record: TableQueries) -> Optional[Either[QueryParserData]]:
if record and record.queries:
data = []
for record in queries.queries:
for table_query in record.queries:
try:
parsed_sql = parse_sql_statement(
record,
table_query,
ConnectionTypeDialectMapper.dialect_of(self.connection_type),
)
if parsed_sql:
@ -106,7 +104,7 @@ class QueryParserProcessor(Processor):
return Either(
left=StackTraceError(
name="Query",
error=f"Error processing query [{record.query}]: {exc}",
error=f"Error processing query [{table_query.query}]: {exc}",
stack_trace=traceback.format_exc(),
)
)

View File

@ -66,6 +66,7 @@ from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.profiler.api.models import ProfilerResponse
from metadata.utils.helpers import calculate_execution_time
from metadata.utils.logger import get_log_name, ingestion_logger
@ -403,6 +404,60 @@ class MetadataRestSink(Sink):
logger.debug(f"No sample data to PUT for {get_log_name(record.entity)}")
return Either(right=record.entity)
@_run_dispatch.register
def write_profiler_response(self, record: ProfilerResponse) -> Either[Table]:
"""Cleanup "`" character in columns and ingest"""
column_profile = record.profile.columnProfile
for column in column_profile:
column.name = column.name.replace("`", "")
record.profile.columnProfile = column_profile
table = self.metadata.ingest_profile_data(
table=record.table,
profile_request=record.profile,
)
logger.debug(
f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.__root__}"
)
if record.sample_data:
table_data = self.metadata.ingest_table_sample_data(
table=record.table, sample_data=record.sample_data
)
if not table_data:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error="Error trying to ingest sample data for table",
)
)
else:
logger.debug(
f"Successfully ingested sample data for {record.table.fullyQualifiedName.__root__}"
)
for column_tag_response in record.column_tags or []:
patched = self.metadata.patch_column_tag(
table=record.table,
column_fqn=column_tag_response.column_fqn,
tag_label=column_tag_response.tag_label,
)
if not patched:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error="Error patching tags for table",
)
)
else:
logger.debug(
f"Successfully patched tag {column_tag_response.tag_label} for"
f" {record.table.fullyQualifiedName.__root__}.{column_tag_response.column_fqn}"
)
return Either(right=table)
def close(self):
"""
We don't have anything to close since we are using the given metadata client

View File

@ -109,9 +109,7 @@ class TableUsageStage(Stage):
)
]
def _run( # pylint: disable=arguments-differ
self, record: QueryParserData, *_, **__
) -> Iterable[Either[str]]:
def _run(self, record: QueryParserData) -> Iterable[Either[str]]:
"""
Process the parsed data and store it in a file
"""

View File

@ -0,0 +1,67 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NER Scanner based on Presidio.
Supported Entities https://microsoft.github.io/presidio/supported_entities/
"""
from enum import Enum
from metadata.pii.models import TagType
from metadata.utils.logger import pii_logger
logger = pii_logger()
class NEREntity(Enum):
"""
PII Entities supported by Presidio https://microsoft.github.io/presidio/supported_entities/
"""
# Global
CREDIT_CARD = TagType.SENSITIVE.value
CRYPTO = TagType.SENSITIVE.value
DATE_TIME = TagType.NONSENSITIVE.value
EMAIL_ADDRESS = TagType.SENSITIVE.value
IBAN_CODE = TagType.SENSITIVE.value
IP_ADDRESS = TagType.SENSITIVE.value
NRP = TagType.NONSENSITIVE.value
LOCATION = TagType.NONSENSITIVE.value
PERSON = TagType.SENSITIVE.value
PHONE_NUMBER = TagType.NONSENSITIVE.value
MEDICAL_LICENSE = TagType.SENSITIVE.value
URL = TagType.NONSENSITIVE.value
# USA
US_BANK_NUMBER = TagType.SENSITIVE.value
US_DRIVER_LICENSE = TagType.SENSITIVE.value
US_ITIN = TagType.SENSITIVE.value
US_PASSPORT = TagType.SENSITIVE.value
US_SSN = TagType.SENSITIVE.value
# UK
UK_NHS = TagType.SENSITIVE.value
# Spain
NIF = TagType.SENSITIVE.value
# Italy
IT_FISCAL_CODE = TagType.SENSITIVE.value
IT_DRIVER_LICENSE = TagType.SENSITIVE.value
IT_VAT_CODE = TagType.SENSITIVE.value
IT_PASSPORT = TagType.SENSITIVE.value
IT_IDENTITY_CARD = TagType.SENSITIVE.value
# Australia
AU_ABN = TagType.SENSITIVE.value
AU_ACN = TagType.SENSITIVE.value
AU_TFN = TagType.SENSITIVE.value
AU_MEDICARE = TagType.SENSITIVE.value

View File

@ -12,39 +12,77 @@
"""
Processor util to fetch pii sensitive columns
"""
from typing import Optional
import traceback
from typing import Optional, cast
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.entity.data.table import Column, TableData
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.column_name_scanner import ColumnNameScanner
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Processor
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.pii.constants import PII
from metadata.pii.ner_scanner import NERScanner
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
from metadata.pii.scanners.ner_scanner import NERScanner
from metadata.profiler.api.models import PatchColumnTagResponse, ProfilerResponse
from metadata.utils import fqn
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class PIIProcessor:
class PIIProcessor(Processor):
"""
A scanner that uses Spacy NER for entity recognition
"""
def __init__(self, metadata: OpenMetadata):
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = create_ometa_client(self.metadata_config)
# Init and type the source config
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
self.metadata = metadata
self.ner_scanner = NERScanner()
self.confidence_threshold = self.source_config.confidence
def patch_column_tag(
self, tag_type: str, table_entity: Table, column_fqn: str
) -> None:
@classmethod
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection
) -> "Step":
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config, metadata_config=metadata_config)
def close(self) -> None:
"""Nothing to close"""
def build_column_tag(
self, tag_type: str, column_fqn: str
) -> PatchColumnTagResponse:
"""
Build the tag and run the PATCH
"""
@ -60,54 +98,84 @@ class PIIProcessor:
state=State.Suggested,
labelType=LabelType.Automated,
)
self.metadata.patch_column_tag(
table=table_entity,
column_fqn=column_fqn,
tag_label=tag_label,
return PatchColumnTagResponse(column_fqn=column_fqn, tag_label=tag_label)
def process_column(
self,
idx: int,
column: Column,
table_data: Optional[TableData],
confidence_threshold: float,
) -> Optional[PatchColumnTagResponse]:
"""
Tag a column with PII if we find it using our scanners
"""
# First, check if the column we are about to process
# already has PII tags or not
column_has_pii_tag = any(
(PII in tag.tagFQN.__root__ for tag in column.tags or [])
)
def process(
# If it has PII tags, we skip the processing
# for the column
if column_has_pii_tag is True:
return None
# Scan by column name. If no results there, check the sample data, if any
tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None
)
if (
tag_and_confidence
and tag_and_confidence.tag
and tag_and_confidence.confidence >= confidence_threshold / 100
):
return self.build_column_tag(
tag_type=tag_and_confidence.tag.value,
column_fqn=column.fullyQualifiedName.__root__,
)
return None
def _run(
self,
table_data: Optional[TableData],
table_entity: Table,
confidence_threshold: float,
):
record: ProfilerResponse,
) -> Either[ProfilerResponse]:
"""
Main entrypoint for the scanner.
Adds PII tagging based on the column names
and TableData
"""
for idx, column in enumerate(table_entity.columns):
# We don't always need to process
if not self.source_config.processPiiSensitive:
return Either(right=record)
column_tags = []
for idx, column in enumerate(record.table.columns):
try:
# First, check if the column we are about to process
# already has PII tags or not
column_has_pii_tag = any(
(PII in tag.tagFQN.__root__ for tag in column.tags or [])
col_tag = self.process_column(
idx=idx,
column=column,
table_data=record.sample_data,
confidence_threshold=self.confidence_threshold,
)
# If it has PII tags, we skip the processing
# for the column
if column_has_pii_tag is True:
continue
# Scan by column name. If no results there, check the sample data, if any
tag_and_confidence = ColumnNameScanner.scan(column.name.__root__) or (
self.ner_scanner.scan([row[idx] for row in table_data.rows])
if table_data
else None
)
if (
tag_and_confidence
and tag_and_confidence.tag
and tag_and_confidence.confidence >= confidence_threshold / 100
):
self.patch_column_tag(
tag_type=tag_and_confidence.tag.value,
table_entity=table_entity,
column_fqn=column.fullyQualifiedName.__root__,
)
if col_tag:
column_tags.append(col_tag)
except Exception as err:
logger.warning(f"Error computing PII tags for [{column}] - [{err}]")
self.status.failed(
StackTraceError(
name=record.table.fullyQualifiedName.__root__,
error=f"Error computing PII tags for [{column}] - [{err}]",
stack_trace=traceback.format_exc(),
)
)
record.column_tags = column_tags
return Either(right=record)

View File

@ -15,64 +15,18 @@ Supported Entities https://microsoft.github.io/presidio/supported_entities/
"""
import traceback
from collections import defaultdict
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from pydantic import BaseModel
from metadata.pii.constants import SPACY_EN_MODEL
from metadata.pii.models import TagAndConfidence, TagType
from metadata.pii.ner import NEREntity
from metadata.utils.logger import pii_logger
logger = pii_logger()
class NEREntity(Enum):
"""
PII Entities supported by Presidio https://microsoft.github.io/presidio/supported_entities/
"""
# Global
CREDIT_CARD = TagType.SENSITIVE.value
CRYPTO = TagType.SENSITIVE.value
DATE_TIME = TagType.NONSENSITIVE.value
EMAIL_ADDRESS = TagType.SENSITIVE.value
IBAN_CODE = TagType.SENSITIVE.value
IP_ADDRESS = TagType.SENSITIVE.value
NRP = TagType.NONSENSITIVE.value
LOCATION = TagType.NONSENSITIVE.value
PERSON = TagType.SENSITIVE.value
PHONE_NUMBER = TagType.NONSENSITIVE.value
MEDICAL_LICENSE = TagType.SENSITIVE.value
URL = TagType.NONSENSITIVE.value
# USA
US_BANK_NUMBER = TagType.SENSITIVE.value
US_DRIVER_LICENSE = TagType.SENSITIVE.value
US_ITIN = TagType.SENSITIVE.value
US_PASSPORT = TagType.SENSITIVE.value
US_SSN = TagType.SENSITIVE.value
# UK
UK_NHS = TagType.SENSITIVE.value
# Spain
NIF = TagType.SENSITIVE.value
# Italy
IT_FISCAL_CODE = TagType.SENSITIVE.value
IT_DRIVER_LICENSE = TagType.SENSITIVE.value
IT_VAT_CODE = TagType.SENSITIVE.value
IT_PASSPORT = TagType.SENSITIVE.value
IT_IDENTITY_CARD = TagType.SENSITIVE.value
# Australia
AU_ABN = TagType.SENSITIVE.value
AU_ACN = TagType.SENSITIVE.value
AU_TFN = TagType.SENSITIVE.value
AU_MEDICARE = TagType.SENSITIVE.value
class StringAnalysis(BaseModel):
"""
Used to store results from the sample data scans for each NER Entity

View File

@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import (
TableData,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.profiler.processor.models import ProfilerDef
@ -67,6 +68,13 @@ class ProfilerProcessorConfig(ConfigModel):
tableConfig: Optional[List[TableConfig]] = None
class PatchColumnTagResponse(ConfigModel):
"""Used to patch a tag to a column"""
column_fqn: str
tag_label: TagLabel
class ProfilerResponse(ConfigModel):
"""
ORM Profiler processor response.
@ -78,3 +86,4 @@ class ProfilerResponse(ConfigModel):
table: Table
profile: CreateTableProfileRequest
sample_data: Optional[TableData] = None
column_tags: Optional[List[PatchColumnTagResponse]] = None

View File

@ -1,459 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Workflow definition for the ORM Profiler.
- How to specify the source
- How to specify the entities to run
- How to define metrics & tests
"""
import traceback
from typing import Iterable, Optional, cast
from pydantic import ValidationError
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.models import StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.status import Status
from metadata.ingestion.api.steps import Sink
from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.profiler.processor.core import Profiler
from metadata.profiler.source.base.profiler_source import ProfilerSource
from metadata.profiler.source.profiler_source_factory import profiler_source_factory
from metadata.timer.repeated_timer import RepeatedTimer
from metadata.utils import fqn
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.importer import get_sink
from metadata.utils.logger import profiler_logger
from metadata.workflow.workflow_output_handler import (
get_ingestion_status_timer,
print_profiler_status,
)
from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger = profiler_logger()
NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,)
SUCCESS_THRESHOLD_VALUE = 90
REPORTS_INTERVAL_SECONDS = 60
class ProfilerInterfaceInstantiationError(Exception):
"""Raise when interface cannot be instantiated"""
class ProfilerWorkflow(WorkflowStatusMixin):
"""
Configure and run the ORM profiler
"""
config: OpenMetadataWorkflowConfig
sink: Sink
metadata: OpenMetadata
def __init__(self, config: OpenMetadataWorkflowConfig):
self.profiler = None # defined in `create_profiler()``
self.config = config
self._timer: Optional[RepeatedTimer] = None
self.metadata_config: OpenMetadataConnection = (
self.config.workflowConfig.openMetadataServerConfig
)
self.profiler_config = ProfilerProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
self.metadata = create_ometa_client(self.metadata_config)
self._retrieve_service_connection_if_needed()
self.test_connection()
self.set_ingestion_pipeline_status(state=PipelineState.running)
# Init and type the source config
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
self.source_status = Status()
self._profiler_interface_args = None
if self.config.sink:
self.sink = get_sink(
sink_type=self.config.sink.type,
sink_config=self.config.sink,
metadata_config=self.metadata_config,
from_="profiler",
)
if not self._validate_service_name():
raise ValueError(
f"Service name `{self.config.source.serviceName}` does not exist. "
"Make sure you have run the ingestion for the service specified in the profiler workflow. "
"If so, make sure the profiler service name matches the service name specified during ingestion "
"and that your ingestion token (settings > bots) is still valid."
)
logger.info(
f"Starting profiler for service {self.config.source.serviceName}"
f":{self.config.source.type.lower()}"
)
@classmethod
def create(cls, config_dict: dict) -> "ProfilerWorkflow":
"""
Parse a JSON (dict) and create the workflow
"""
try:
config = parse_workflow_config_gracefully(config_dict)
return cls(config)
except ValidationError as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error trying to parse the Profiler Workflow configuration: {err}"
)
raise err
@property
def timer(self) -> RepeatedTimer:
"""Status timer"""
if not self._timer:
self._timer = get_ingestion_status_timer(
interval=REPORTS_INTERVAL_SECONDS, logger=logger, workflow=self
)
return self._timer
def filter_databases(self, database: Database) -> Optional[Database]:
"""Returns filtered database entities"""
if filter_by_database(
self.source_config.databaseFilterPattern,
database.name.__root__,
):
self.source_status.filter(
database.name.__root__, "Database pattern not allowed"
)
return None
return database
def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]:
"""
From a list of tables, apply the SQLSourceConfig
filter patterns.
We will update the status on the SQLSource Status.
"""
for table in tables:
try:
if filter_by_schema(
self.source_config.schemaFilterPattern,
table.databaseSchema.name, # type: ignore
):
self.source_status.filter(
f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", # type: ignore
"Schema pattern not allowed",
)
continue
if filter_by_table(
self.source_config.tableFilterPattern,
table.name.__root__,
):
self.source_status.filter(
f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", # type: ignore
"Table pattern not allowed",
)
continue
if (
table.tableType == TableType.View
and not self.source_config.includeViews
):
self.source_status.filter(
table.fullyQualifiedName.__root__,
"View filtered out",
)
continue
yield table
except Exception as exc:
self.source_status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error=f"Unexpected error filtering entities for table [{table}]: {exc}",
stack_trace=traceback.format_exc(),
)
)
def get_database_entities(self):
"""List all databases in service"""
databases = [
self.filter_databases(database)
for database in self.metadata.list_all_entities(
entity=Database,
params={"service": self.config.source.serviceName},
)
if self.filter_databases(database)
]
if not databases:
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
)
return databases
def get_table_entities(self, database):
"""
List and filter OpenMetadata tables based on the
source configuration.
The listing will be based on the entities from the
informed service name in the source configuration.
Note that users can specify `table_filter_pattern` to
either be `includes` or `excludes`. This means
that we will either what is specified in `includes`
or we will use everything but the tables excluded.
Same with `schema_filter_pattern`.
"""
tables = self.metadata.list_all_entities(
entity=Table,
fields=[
"tableProfilerConfig",
],
params={
"service": self.config.source.serviceName,
"database": fqn.build(
self.metadata,
entity_type=Database,
service_name=self.config.source.serviceName,
database_name=database.name.__root__,
),
}, # type: ignore
)
yield from self.filter_entities(tables)
def run_profiler(
self, entity: Table, profiler_source: ProfilerSource
) -> Optional[ProfilerResponse]:
"""
Main logic for the profiler workflow
"""
profiler_runner: Profiler = profiler_source.get_profiler_runner(
entity, self.profiler_config
)
try:
profile: ProfilerResponse = profiler_runner.process(
self.source_config.generateSampleData,
self.source_config.processPiiSensitive,
)
except Exception as exc:
self.source_status.failed(
StackTraceError(
name=entity.fullyQualifiedName.__root__,
error=f"Unexpected exception processing entity {entity.fullyQualifiedName.__root__}: {exc}",
stack_trace=traceback.format_exc(),
)
)
self.source_status.fail_all(
profiler_source.interface.processor_status.failures
)
self.source_status.records.extend(
profiler_source.interface.processor_status.records
)
else:
# at this point we know we have an interface variable since we the `try` block above didn't raise
self.source_status.fail_all(profiler_source.interface.processor_status.failures) # type: ignore
self.source_status.records.extend(
profiler_source.interface.processor_status.records # type: ignore
)
return profile
finally:
profiler_runner.close()
return None
def execute(self):
"""
Run the profiling and tests
"""
self.timer.trigger()
try:
for database in self.get_database_entities():
profiler_source = profiler_source_factory.create(
self.config.source.type.lower(),
self.config,
database,
self.metadata,
)
for entity in self.get_table_entities(database=database):
profile = self.run_profiler(entity, profiler_source)
if hasattr(self, "sink") and profile:
self.sink.write_record(profile)
# At the end of the `execute`, update the associated Ingestion Pipeline status as success
self.update_ingestion_status_at_end()
# Any unhandled exception breaking the workflow should update the status
except Exception as err:
self.set_ingestion_pipeline_status(PipelineState.failed)
raise err
# Force resource closing. Required for killing the threading
finally:
self.stop()
def print_status(self) -> None:
"""
Print the workflow results with click
"""
print_profiler_status(self)
def result_status(self) -> int:
"""
Returns 1 if status is failed, 0 otherwise.
"""
if self.source_status.failures or (
hasattr(self, "sink") and self.sink.get_status().failures
):
return 1
return 0
def _get_source_success(self):
"""Compue the success rate of the source"""
return self.source_status.calculate_success()
def update_ingestion_status_at_end(self):
"""
Once the execute method is done, update the status
as OK or KO depending on the success rate.
"""
pipeline_state = PipelineState.success
if SUCCESS_THRESHOLD_VALUE <= self._get_source_success() < 100:
pipeline_state = PipelineState.partialSuccess
self.set_ingestion_pipeline_status(pipeline_state)
def _raise_from_status_internal(self, raise_warnings=False):
"""
Check source, processor and sink status and raise if needed
Our profiler source will never log any failure, only filters,
as we are just picking up data from OM.
"""
if self._get_source_success() < SUCCESS_THRESHOLD_VALUE:
raise WorkflowExecutionError(
"Processor reported errors", self.source_status
)
if hasattr(self, "sink") and self.sink.get_status().failures:
raise WorkflowExecutionError("Sink reported errors", self.sink.get_status())
if raise_warnings:
if self.source_status.warnings:
raise WorkflowExecutionError(
"Source reported warnings", self.source_status
)
if hasattr(self, "sink") and self.sink.get_status().warnings:
raise WorkflowExecutionError(
"Sink reported warnings", self.sink.get_status()
)
def _validate_service_name(self):
"""Validate service name exists in OpenMetadata"""
return self.metadata.get_by_name(
entity=DatabaseService, fqn=self.config.source.serviceName
)
def stop(self):
"""
Close all connections
"""
self.metadata.close()
self.timer.stop()
def _retrieve_service_connection_if_needed(self) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
:return:
"""
service_type: ServiceType = get_service_type_from_source_type(
self.config.source.type
)
if (
not self.config.source.serviceConnection
and not self.metadata.config.forceEntityOverwriting
):
service_name = self.config.source.serviceName
try:
service: ServiceWithConnectionType = cast(
ServiceWithConnectionType,
self.metadata.get_by_name(
get_service_class_from_service_type(service_type),
service_name,
),
)
if not service:
raise ConnectionError(
f"Could not retrieve service with name `{service_name}`. "
"Typically caused by the `serviceName` does not exists in OpenMetadata "
"or the JWT Token is invalid."
)
if service:
self.config.source.serviceConnection = ServiceConnection(
__root__=service.connection
)
except ConnectionError as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
def test_connection(self):
service_config = self.config.source.serviceConnection.__root__.config
self.engine = get_connection(service_config)
test_connection_fn = get_test_connection_fn(service_config)
test_connection_fn(self.metadata, self.engine, service_config)

View File

@ -271,13 +271,13 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
name = f"{column if column is not None else table}"
error = f"{name} metric_type.value: {exc}"
logger.error(error)
self.processor_status.failed_profiler(error, traceback.format_exc())
self.status.failed_profiler(error, traceback.format_exc())
row = None
if column is not None:
column = column.name
self.processor_status.scanned(f"{table.name.__root__}.{column}")
self.status.scanned(f"{table.name.__root__}.{column}")
else:
self.processor_status.scanned(table.name.__root__)
self.status.scanned(table.name.__root__)
return row, column, metric_type.value
def fetch_sample_data(self, table) -> TableData:

View File

@ -64,13 +64,13 @@ class ProfilerInterface(ABC):
self.source_config = source_config
self.service_connection_config = service_connection_config
self.connection = get_connection(self.service_connection_config)
self.processor_status = ProfilerProcessorStatus()
self.status = ProfilerProcessorStatus()
try:
fqn = self.table_entity.fullyQualifiedName
except AttributeError:
self.processor_status.entity = None
self.status.entity = None
else:
self.processor_status.entity = fqn.__root__ if fqn else None
self.status.entity = fqn.__root__ if fqn else None
self.profile_sample_config = profile_sample_config
self.profile_query = sample_query
self.partition_details = (

View File

@ -423,14 +423,14 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
except Exception as exc:
error = f"{column if column is not None else runner.table.__tablename__} metric_type.value: {exc}"
logger.error(error)
self.processor_status.failed_profiler(error, traceback.format_exc())
self.status.failed_profiler(error, traceback.format_exc())
row = None
if column is not None:
column = column.name
self.processor_status.scanned(f"{table.__tablename__}.{column}")
self.status.scanned(f"{table.__tablename__}.{column}")
else:
self.processor_status.scanned(table.__tablename__)
self.status.scanned(table.__tablename__)
return row, column, metric_type.value

View File

@ -33,7 +33,6 @@ from metadata.generated.schema.entity.data.table import (
TableData,
TableProfile,
)
from metadata.pii.processor import PIIProcessor
from metadata.profiler.api.models import ProfilerResponse
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import (
@ -445,7 +444,6 @@ class Profiler(Generic[TMetric]):
def process(
self,
generate_sample_data: Optional[bool],
process_pii_sensitive: Optional[bool],
) -> ProfilerResponse:
"""
Given a table, we will prepare the profiler for
@ -462,11 +460,6 @@ class Profiler(Generic[TMetric]):
else:
sample_data = None
# If we also have sample data, we'll use the NER Scanner,
# otherwise we'll stick to the ColumnNameScanner
if process_pii_sensitive:
self.process_pii_sensitive(sample_data)
profile = self.get_profile()
self._check_profile_and_handle(profile)
@ -495,28 +488,6 @@ class Profiler(Generic[TMetric]):
logger.warning(f"Error fetching sample data: {err}")
return None
def process_pii_sensitive(self, sample_data: TableData) -> None:
"""Read sample data to find pii sensitive columns and tag them
as PII sensitive data
Args:
sample_data (TableData): sample data
"""
try:
pii_processor = PIIProcessor(
metadata=self.profiler_interface.ometa_client # type: ignore
)
pii_processor.process(
sample_data,
self.profiler_interface.table_entity, # type: ignore
self.profiler_interface.source_config.confidence,
)
except Exception as exc:
logger.warning(
f"Unexpected error while processing sample data for auto pii tagging - {exc}"
)
logger.debug(traceback.format_exc())
def get_profile(self) -> CreateTableProfileRequest:
"""
After executing the profiler, get all results

View File

@ -0,0 +1,89 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Profiler Processor Step
"""
import traceback
from typing import cast
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Processor
from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.profiler.processor.core import Profiler
from metadata.profiler.source.metadata import ProfilerSourceAndEntity
class ProfilerProcessor(Processor):
"""
This processor is in charge of getting the profiler source and entity coming from
the OpenMetadataSource and compute the metrics.
"""
def __init__(self, config: OpenMetadataWorkflowConfig):
super().__init__()
self.config = config
self.profiler_config = ProfilerProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
def _run(self, record: ProfilerSourceAndEntity) -> Either[ProfilerResponse]:
profiler_runner: Profiler = record.profiler_source.get_profiler_runner(
record.entity, self.profiler_config
)
try:
profile: ProfilerResponse = profiler_runner.process(
self.source_config.generateSampleData,
)
except Exception as exc:
self.status.failed(
StackTraceError(
name=record.entity.fullyQualifiedName.__root__,
error=f"Unexpected exception processing entity {record.entity.fullyQualifiedName.__root__}: {exc}",
stack_trace=traceback.format_exc(),
)
)
self.status.failures.extend(
record.profiler_source.interface.status.failures
)
else:
# at this point we know we have an interface variable since we the `try` block above didn't raise
self.status.failures.extend(record.profiler_source.interface.status.failures) # type: ignore
return Either(right=profile)
finally:
profiler_runner.close()
return Either()
@classmethod
def create(cls, config_dict: dict, _: OpenMetadataConnection) -> "Step":
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config)
def close(self) -> None:
"""We are already closing the connections after each execution"""

View File

@ -1,5 +0,0 @@
# ORM Profiler Sink
Host all the Sinks to push Profile data and validations to.
We will provide OpenMetadata REST Sink by default.

View File

@ -1,71 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Profiler File Sink
"""
from pathlib import Path
from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.sink import Sink
from metadata.profiler.api.models import ProfilerResponse
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class FileSinkConfig(ConfigModel):
filename: str
class FileSink(Sink[Entity]):
"""
Helper sink to save profiler
results in a file for analysis
"""
config: FileSinkConfig
def __init__(
self,
config: FileSinkConfig,
):
super().__init__()
self.config = config
fpath = Path(self.config.filename)
# Build the path if it does not exist
if not fpath.parent.is_dir():
Path(self.config.filename).mkdir(parents=True, exist_ok=True)
# pylint: disable=consider-using-with
self.file = fpath.open("w", encoding="utf-8")
self.wrote_something = False
@classmethod
def create(cls, config_dict: dict, _):
config = FileSinkConfig.parse_obj(config_dict)
return cls(config)
def write_record(self, record: ProfilerResponse) -> None:
if self.wrote_something:
self.file.write("\n")
self.file.write(f"Profile for: {record.table.fullyQualifiedName.__root__}\n")
self.file.write(f"{record.profile.json()}\n")
self.wrote_something = True
self.status.records_written(record.table.fullyQualifiedName.__root__)
def close(self):
self.file.write("\n]")
self.file.close()

View File

@ -1,126 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata REST Sink implementation for the ORM Profiler results
"""
import traceback
from typing import Optional
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTableProfile import (
CreateTableProfileRequest,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import StackTraceError
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfilerResponse
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class MetadataRestSinkConfig(ConfigModel):
api_endpoint: Optional[str] = None
class MetadataRestSink(Sink[Entity]):
"""
Metadata Sink sending the profiler
and tests results to the OM API
"""
config: MetadataRestSinkConfig
def __init__(
self,
config: MetadataRestSinkConfig,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.wrote_something = False
self.metadata = OpenMetadata(self.metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = MetadataRestSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
@staticmethod
def clean_up_profile_columns(
profile: CreateTableProfileRequest,
) -> CreateTableProfileRequest:
"""clean up "`" character used for BQ array
Args:
profile (CreateTableProfileRequest): profiler request
Returns:
CreateTableProfileRequest: profiler request modified
"""
column_profile = profile.columnProfile
for column in column_profile:
column.name = column.name.replace("`", "")
profile.columnProfile = column_profile
return profile
def close(self) -> None:
self.metadata.close()
def write_record(self, record: ProfilerResponse) -> None:
log = f"{type(record.table).__name__} [{record.table.name.__root__}]"
try:
self.metadata.ingest_profile_data(
table=record.table,
profile_request=self.clean_up_profile_columns(record.profile),
)
logger.debug(
f"Successfully ingested profile metrics for {record.table.fullyQualifiedName.__root__}"
)
if record.sample_data:
self.metadata.ingest_table_sample_data(
table=record.table, sample_data=record.sample_data
)
logger.debug(
f"Successfully ingested sample data for {record.table.fullyQualifiedName.__root__}"
)
self.status.records_written(
f"Table: {record.table.fullyQualifiedName.__root__}"
)
except APIError as err:
name = record.table.fullyQualifiedName.__root__
error = f"Failed to sink profiler & test data for {name}: {err}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(
StackTraceError(
name=name, error=error, stack_trace=traceback.format_exc()
)
)
except Exception as exc:
error = f"Failed to ingest {log}: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(
StackTraceError(
name=log, error=error, stack_trace=traceback.format_exc()
)
)

View File

@ -0,0 +1,257 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata source for the profiler
"""
import traceback
from typing import Iterable, Optional, cast
from pydantic import BaseModel
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.profiler.source.base.profiler_source import ProfilerSource
from metadata.profiler.source.profiler_source_factory import profiler_source_factory
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class ProfilerSourceAndEntity(BaseModel):
"""Return class for the OpenMetadata Profiler Source"""
class Config:
arbitrary_types_allowed = True
extra = "forbid"
profiler_source: ProfilerSource
entity: Table
class OpenMetadataSource(Source):
"""
This source lists and filters the entities that need
to be processed by the profiler workflow.
Note that in order to manage the following steps we need
to test the connection against the Database Service Source.
We do this here as well.
"""
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = create_ometa_client(self.metadata_config)
self.test_connection()
# Init and type the source config
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
if not self._validate_service_name():
raise ValueError(
f"Service name `{self.config.source.serviceName}` does not exist. "
"Make sure you have run the ingestion for the service specified in the profiler workflow. "
"If so, make sure the profiler service name matches the service name specified during ingestion "
"and that your ingestion token (settings > bots) is still valid."
)
logger.info(
f"Starting profiler for service {self.config.source.serviceName}"
f":{self.config.source.type.lower()}"
)
def _validate_service_name(self):
"""Validate service name exists in OpenMetadata"""
return self.metadata.get_by_name(
entity=DatabaseService, fqn=self.config.source.serviceName
)
def prepare(self):
"""Nothing to prepare"""
def test_connection(self) -> None:
"""
Our source is the ometa client. Validate the
health check before moving forward
"""
self.metadata.health_check()
def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]:
for database in self.get_database_entities():
try:
profiler_source = profiler_source_factory.create(
self.config.source.type.lower(),
self.config,
database,
self.metadata,
)
for entity in self.get_table_entities(database=database):
yield Either(
right=ProfilerSourceAndEntity(
profiler_source=profiler_source,
entity=entity,
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=database.fullyQualifiedName.__root__,
error=f"Error listing source and entities for database due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)
@classmethod
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection
) -> "Step":
config = parse_workflow_config_gracefully(config_dict)
return cls(config=config, metadata_config=metadata_config)
def filter_databases(self, database: Database) -> Optional[Database]:
"""Returns filtered database entities"""
if filter_by_database(
self.source_config.databaseFilterPattern,
database.name.__root__,
):
self.status.filter(database.name.__root__, "Database pattern not allowed")
return None
return database
def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]:
"""
From a list of tables, apply the SQLSourceConfig
filter patterns.
We will update the status on the SQLSource Status.
"""
for table in tables:
try:
if filter_by_schema(
self.source_config.schemaFilterPattern,
table.databaseSchema.name, # type: ignore
):
self.status.filter(
f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}",
"Schema pattern not allowed",
)
continue
if filter_by_table(
self.source_config.tableFilterPattern,
table.name.__root__,
):
self.status.filter(
f"Table pattern not allowed: {table.fullyQualifiedName.__root__}",
"Table pattern not allowed",
)
continue
if (
table.tableType == TableType.View
and not self.source_config.includeViews
):
self.status.filter(
table.fullyQualifiedName.__root__,
"View filtered out",
)
continue
yield table
except Exception as exc:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error=f"Unexpected error filtering entities for table [{table}]: {exc}",
stack_trace=traceback.format_exc(),
)
)
def get_database_entities(self):
"""List all databases in service"""
databases = [
self.filter_databases(database)
for database in self.metadata.list_all_entities(
entity=Database,
params={"service": self.config.source.serviceName},
)
if self.filter_databases(database)
]
if not databases:
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
f"\n\t- excludes: {self.source_config.databaseFilterPattern.excludes if self.source_config.databaseFilterPattern else None}" # pylint: disable=line-too-long
)
return databases
def get_table_entities(self, database):
"""
List and filter OpenMetadata tables based on the
source configuration.
The listing will be based on the entities from the
informed service name in the source configuration.
Note that users can specify `table_filter_pattern` to
either be `includes` or `excludes`. This means
that we will either what is specified in `includes`
or we will use everything but the tables excluded.
Same with `schema_filter_pattern`.
"""
tables = self.metadata.list_all_entities(
entity=Table,
fields=[
"tableProfilerConfig",
],
params={
"service": self.config.source.serviceName,
"database": fqn.build(
self.metadata,
entity_type=Database,
service_name=self.config.source.serviceName,
database_name=database.name.__root__,
),
}, # type: ignore
)
yield from self.filter_entities(tables)
def close(self) -> None:
"""Nothing to close"""

View File

@ -100,7 +100,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
self.set_ingestion_pipeline_status(state=PipelineState.running)
# Pick up the service connection from the API if neededweb_analytic_report_data_processor
# Pick up the service connection from the API if needed
self._retrieve_service_connection_if_needed(self.service_type)
# Informs the `source` and the rest of `steps` to execute

View File

@ -0,0 +1,72 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Workflow definition for the profiler
"""
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.steps import Processor, Sink
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.pii.processor import PIIProcessor
from metadata.profiler.processor.processor import ProfilerProcessor
from metadata.profiler.source.metadata import OpenMetadataSource
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import profiler_logger
from metadata.workflow.base import BaseWorkflow
logger = profiler_logger()
class ProfilerWorkflow(BaseWorkflow):
"""
Profiler ingestion workflow implementation
We check the source connection test when initializing
this workflow. No need to do anything here if this does not pass
"""
def __init__(self, config: OpenMetadataWorkflowConfig):
super().__init__(config)
self.test_connection()
def set_steps(self):
self.source = OpenMetadataSource.create(
self.config.dict(), self.metadata_config
)
profiler_processor = self._get_profiler_processor()
pii_processor = self._get_pii_processor()
sink = self._get_sink()
self.steps = (profiler_processor, pii_processor, sink)
def test_connection(self):
service_config = self.config.source.serviceConnection.__root__.config
conn = get_connection(service_config)
test_connection_fn = get_test_connection_fn(service_config)
test_connection_fn(self.metadata, conn, service_config)
def _get_sink(self) -> Sink:
sink_type = self.config.sink.type
sink_class = import_sink_class(sink_type=sink_type)
sink_config = self.config.sink.dict().get("config", {})
sink: Sink = sink_class.create(sink_config, self.metadata_config)
logger.debug(f"Sink type:{self.config.sink.type}, {sink_class} configured")
return sink
def _get_profiler_processor(self) -> Processor:
return ProfilerProcessor.create(self.config.dict(), self.metadata_config)
def _get_pii_processor(self) -> Processor:
return PIIProcessor.create(self.config.dict(), self.metadata_config)

View File

@ -230,41 +230,6 @@ def print_status(workflow: "BaseWorkflow") -> None:
)
def print_profiler_status(workflow) -> None:
"""
Print the profiler workflow results
"""
print_workflow_summary_legacy(
workflow,
source=True,
processor=True,
source_status=workflow.source_status,
)
if workflow.source_status.source_start_time:
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-workflow.source_status.source_start_time)}",
)
if workflow.result_status() == 1:
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE
)
elif workflow.source_status.warnings or (
hasattr(workflow, "sink") and workflow.sink.get_status().warnings
):
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE
)
else:
log_ansi_encoded_string(
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
)
def print_test_suite_status(workflow) -> None:
"""
Print the test suite workflow results

View File

@ -55,12 +55,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
@ -166,7 +166,7 @@ class TestBigquerySystem(TestCase):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
# get latest profile metrics

View File

@ -41,12 +41,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
@ -133,7 +133,7 @@ class TestRedshiftSystem(TestCase):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
# get latest profile metrics

View File

@ -60,12 +60,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
TESTS_ROOT_DIR = pathlib.Path(__file__).parent.parent.parent.parent
@ -157,7 +157,7 @@ class TestSnowflakeystem(TestCase):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
# get latest profile metrics

View File

@ -34,12 +34,12 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
BUCKET_NAME = "MyBucket"

View File

@ -21,7 +21,6 @@ from copy import deepcopy
from datetime import datetime, timedelta
from unittest import TestCase
import pytest
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base
@ -35,8 +34,8 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
)
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
logging.basicConfig(level=logging.WARN)
@ -303,7 +302,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -349,7 +348,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -388,7 +387,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -436,7 +435,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -476,7 +475,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -523,7 +522,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(
@ -563,7 +562,7 @@ class ProfilerWorkflowTest(TestCase):
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.print_status()
print_status(profiler_workflow)
profiler_workflow.stop()
table = self.metadata.get_by_name(

View File

@ -20,6 +20,9 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTableProfile import (
CreateTableProfileRequest,
)
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
@ -29,6 +32,7 @@ from metadata.generated.schema.entity.data.table import (
DataType,
Table,
TableData,
TableProfile,
)
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
@ -44,12 +48,23 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Source,
SourceConfig,
WorkflowConfig,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.basic import Timestamp
from metadata.generated.schema.type.tagLabel import TagFQN, TagLabel
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.pii.processor import PIIProcessor
from metadata.profiler.api.models import PatchColumnTagResponse, ProfilerResponse
table_data = TableData(
columns=[
@ -100,158 +115,50 @@ table_data = TableData(
)
EXPECTED_TABLE_ENTITY = [
Column(
name=ColumnName(__root__="customer_id"),
displayName=None,
dataType="INT",
arrayDataType=None,
dataLength=None,
precision=None,
scale=None,
dataTypeDisplay="int",
description=None,
fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.customer_id",
tags=[],
constraint=None,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
EXPECTED_COLUMN_TAGS = [
PatchColumnTagResponse(
column_fqn="test-service-table-patch.test-db.test-schema.customers.first_name",
tag_label=TagLabel(
tagFQN=TagFQN(__root__="PII.Sensitive"),
description=(
"PII which if lost, compromised, or disclosed without authorization, could result in"
" substantial harm, embarrassment, inconvenience, or unfairness to an individual."
),
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
),
),
Column(
name=ColumnName(__root__="first_name"),
displayName=None,
dataType="VARCHAR",
arrayDataType=None,
dataLength=20,
precision=None,
scale=None,
dataTypeDisplay="varchar",
description=None,
fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.first_name",
tags=[
TagLabel(
tagFQN=TagFQN(__root__="PII.Sensitive"),
description=(
PatchColumnTagResponse(
column_fqn="test-service-table-patch.test-db.test-schema.customers.first_order",
tag_label=TagLabel(
tagFQN=TagFQN(__root__="PII.NonSensitive"),
description=(
"PII which is easily accessible from public sources and can include zip code, "
"race, gender, and date of birth."
),
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
),
),
PatchColumnTagResponse(
column_fqn="test-service-table-patch.test-db.test-schema.customers.customer_email",
tag_label=TagLabel(
tagFQN=TagFQN(__root__="PII.Sensitive"),
description=(
(
"PII which if lost, compromised, or disclosed without authorization, could result in"
" substantial harm, embarrassment, inconvenience, or unfairness to an individual."
),
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
)
],
constraint=None,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name=ColumnName(__root__="last_name"),
displayName=None,
dataType="VARCHAR",
arrayDataType=None,
dataLength=20,
precision=None,
scale=None,
dataTypeDisplay="varchar",
description=None,
fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.last_name",
tags=[],
constraint=None,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name=ColumnName(__root__="first_order"),
displayName=None,
dataType="DATE",
arrayDataType=None,
dataLength=None,
precision=None,
scale=None,
dataTypeDisplay="date",
description=None,
fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.first_order",
tags=[
TagLabel(
tagFQN=TagFQN(__root__="PII.NonSensitive"),
description=(
"PII which is easily accessible from public sources and can include zip code, "
"race, gender, and date of birth."
),
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
)
],
constraint=None,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name=ColumnName(__root__="customer_email"),
displayName=None,
dataType="VARCHAR",
arrayDataType=None,
dataLength=20,
precision=None,
scale=None,
dataTypeDisplay="date",
description=None,
fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.customer_email",
tags=[
TagLabel(
tagFQN=TagFQN(__root__="PII.Sensitive"),
description=(
(
"PII which if lost, compromised, or disclosed without authorization, could result in"
" substantial harm, embarrassment, inconvenience, or unfairness to an individual."
)
),
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
)
],
constraint=None,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
),
Column(
name=ColumnName(__root__="number_of_orders"),
displayName=None,
dataType="BIGINT",
arrayDataType=None,
dataLength=None,
precision=None,
scale=None,
dataTypeDisplay="bigint",
description=None,
fullyQualifiedName="test-service-table-patch.test-db.test-schema.customers.number_of_orders",
tags=[],
constraint=None,
ordinalPosition=None,
jsonSchema=None,
children=None,
customMetrics=None,
profile=None,
)
),
source="Classification",
labelType="Automated",
state="Suggested",
href=None,
),
),
]
@ -275,8 +182,91 @@ class PiiProcessorTest(TestCase):
"5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
)
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type="mysql",
serviceName="test",
sourceConfig=SourceConfig(
config=DatabaseServiceProfilerPipeline(
confidence=85,
processPiiSensitive=True,
)
),
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=server_config),
)
metadata = OpenMetadata(server_config)
ner_scanner_processor = PIIProcessor(metadata)
ner_scanner_processor = PIIProcessor(
config=workflow_config, metadata_config=server_config
)
@classmethod
def setUpClass(cls) -> None:
"""
Prepare ingredients
"""
service = CreateDatabaseServiceRequest(
name="test-service-table-patch",
serviceType=DatabaseServiceType.Mysql,
connection=DatabaseConnection(
config=MysqlConnection(
username="username",
authType=BasicAuth(
password="password",
),
hostPort="http://localhost:1234",
)
),
)
service_entity = cls.metadata.create_or_update(data=service)
create_db = CreateDatabaseRequest(
name="test-db",
service=service_entity.fullyQualifiedName,
)
create_db_entity = cls.metadata.create_or_update(data=create_db)
create_schema = CreateDatabaseSchemaRequest(
name="test-schema",
database=create_db_entity.fullyQualifiedName,
)
create_schema_entity = cls.metadata.create_or_update(data=create_schema)
created_table = CreateTableRequest(
name="customers",
columns=[
Column(name="customer_id", dataType=DataType.INT),
Column(name="first_name", dataType=DataType.VARCHAR, dataLength=20),
Column(name="last_name", dataType=DataType.VARCHAR, dataLength=20),
Column(name="first_order", dataType=DataType.DATE),
Column(name="customer_email", dataType=DataType.VARCHAR, dataLength=20),
Column(name="number_of_orders", dataType=DataType.BIGINT),
],
databaseSchema=create_schema_entity.fullyQualifiedName,
)
cls.table_entity = cls.metadata.create_or_update(data=created_table)
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
service_id = str(
cls.metadata.get_by_name(
entity=DatabaseService, fqn="test-service-table-patch"
).id.__root__
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
@classmethod
def setUpClass(cls) -> None:
@ -349,15 +339,22 @@ class PiiProcessorTest(TestCase):
test function for ner Scanner
"""
self.ner_scanner_processor.process(
table_data=table_data,
table_entity=self.table_entity,
confidence_threshold=85,
)
updated_table_entity = self.metadata.get_by_id(
entity=Table, entity_id=self.table_entity.id, fields=["tags"]
record = ProfilerResponse(
table=self.table_entity,
profile=CreateTableProfileRequest(
tableProfile=TableProfile(
timestamp=Timestamp(
__root__=int(datetime.datetime.now().timestamp() * 1000)
)
)
),
sample_data=table_data,
)
updated_record: ProfilerResponse = self.ner_scanner_processor.run(record)
for _, (expected, original) in enumerate(
zip(EXPECTED_TABLE_ENTITY, updated_table_entity.columns)
zip(EXPECTED_COLUMN_TAGS, updated_record.column_tags)
):
self.assertEqual(expected.tags, original.tags)
self.assertEqual(expected.column_fqn, original.column_fqn)
self.assertEqual(expected.tag_label.tagFQN, original.tag_label.tagFQN)

View File

@ -13,8 +13,8 @@ Test Column Name Scanner
"""
from unittest import TestCase
from metadata.pii.column_name_scanner import ColumnNameScanner
from metadata.pii.models import TagAndConfidence, TagType
from metadata.pii.scanners.column_name_scanner import ColumnNameScanner
EXPECTED_SENSITIVE = TagAndConfidence(
tag=TagType.SENSITIVE,

View File

@ -14,7 +14,7 @@ Test Column Name Scanner
from unittest import TestCase
from metadata.pii.models import TagType
from metadata.pii.ner_scanner import NERScanner
from metadata.pii.scanners.ner_scanner import NERScanner
class NERScannerTest(TestCase):

View File

@ -30,8 +30,8 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.workflow.profiler import ProfilerWorkflow
"""
Check Partitioned Table in Profiler Workflow
@ -117,7 +117,9 @@ MOCK_RANGE_PARTITIONING = RangePartitioning(
class ProfilerPartitionUnitTest(TestCase):
@patch("metadata.profiler.api.workflow.ProfilerWorkflow._validate_service_name")
@patch(
"metadata.profiler.source.metadata.OpenMetadataSource._validate_service_name"
)
@patch("google.auth.default")
@patch("sqlalchemy.engine.base.Engine.connect")
@patch("sqlalchemy_bigquery._helpers.create_bigquery_client")

View File

@ -38,12 +38,13 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.processor.default import DefaultProfiler
from metadata.profiler.source.base.profiler_source import ProfilerSource
from metadata.profiler.source.metadata import OpenMetadataSource
from metadata.workflow.profiler import ProfilerWorkflow
TABLE = Table(
id=uuid.uuid4(),
@ -109,7 +110,7 @@ class User(Base):
return_value=User,
)
@patch.object(
ProfilerWorkflow,
OpenMetadataSource,
"_validate_service_name",
return_value=True,
)
@ -120,16 +121,18 @@ def test_init_workflow(mocked_method, mocked_orm): # pylint: disable=unused-arg
workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.source.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
assert isinstance(workflow.profiler_config, ProfilerProcessorConfig)
assert workflow.profiler_config.profiler is None
assert workflow.profiler_config.tableConfig is None
profiler_processor_step = workflow.steps[0]
assert isinstance(profiler_processor_step.profiler_config, ProfilerProcessorConfig)
assert profiler_processor_step.profiler_config.profiler is None
assert profiler_processor_step.profiler_config.tableConfig is None
@patch.object(
ProfilerWorkflow,
OpenMetadataSource,
"_validate_service_name",
return_value=True,
)
@ -174,7 +177,7 @@ def test_filter_entities(mocked_method):
]
# Simple workflow does not filter
assert len(list(workflow.filter_entities(all_tables))) == 3
assert len(list(workflow.source.filter_entities(all_tables))) == 3
# We can exclude based on the schema name
exclude_config = deepcopy(config)
@ -184,7 +187,7 @@ def test_filter_entities(mocked_method):
exclude_workflow = ProfilerWorkflow.create(exclude_config)
mocked_method.assert_called()
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
assert len(list(exclude_workflow.source.filter_entities(all_tables))) == 2
exclude_config = deepcopy(config)
exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
@ -193,7 +196,7 @@ def test_filter_entities(mocked_method):
exclude_workflow = ProfilerWorkflow.create(exclude_config)
mocked_method.assert_called()
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
assert len(list(exclude_workflow.source.filter_entities(all_tables))) == 2
include_config = deepcopy(config)
include_config["source"]["sourceConfig"]["config"]["databaseFilterPattern"] = {
@ -202,7 +205,7 @@ def test_filter_entities(mocked_method):
include_workflow = ProfilerWorkflow.create(include_config)
mocked_method.assert_called()
assert len(list(include_workflow.filter_entities(all_tables))) == 3
assert len(list(include_workflow.source.filter_entities(all_tables))) == 3
@patch.object(
@ -211,7 +214,7 @@ def test_filter_entities(mocked_method):
return_value=User,
)
@patch.object(
ProfilerWorkflow,
OpenMetadataSource,
"_validate_service_name",
return_value=True,
)
@ -230,6 +233,8 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum
profile_workflow = ProfilerWorkflow.create(profile_config)
mocked_method.assert_called()
profiler_processor_step = profile_workflow.steps[0]
profiler_source = ProfilerSource(
profile_workflow.config,
DatabaseService(
@ -240,13 +245,13 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum
profile_workflow.metadata,
)
profiler_runner = profiler_source.get_profiler_runner(
TABLE, profile_workflow.profiler_config
TABLE, profiler_processor_step.profiler_config
)
# profile_workflow.create_profiler(TABLE, profiler_interface)
profiler_obj_metrics = [metric.name() for metric in profiler_runner.metrics]
assert profile_workflow.profiler_config.profiler
assert profiler_processor_step.profiler_config.profiler
assert config_metrics_label == profiler_obj_metrics
@ -256,7 +261,7 @@ def test_profile_def(mocked_method, mocked_orm): # pylint: disable=unused-argum
return_value=User,
)
@patch.object(
ProfilerWorkflow,
OpenMetadataSource,
"_validate_service_name",
return_value=True,
)
@ -271,6 +276,8 @@ def test_default_profile_def(
profile_workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
profiler_processor_step = profile_workflow.steps[0]
profiler_source = ProfilerSource(
profile_workflow.config,
DatabaseService(
@ -281,7 +288,7 @@ def test_default_profile_def(
profile_workflow.metadata,
)
profiler_runner = profiler_source.get_profiler_runner(
TABLE, profile_workflow.profiler_config
TABLE, profiler_processor_step.profiler_config
)
assert isinstance(

View File

@ -114,14 +114,6 @@ class ImporterTest(TestCase):
MetadataUsageBulkSink,
)
def test_import_sink_from(self) -> None:
from metadata.profiler.sink.metadata_rest import MetadataRestSink
self.assertEqual(
import_sink_class(sink_type="metadata-rest", from_="profiler"),
MetadataRestSink,
)
def test_import_get_connection(self) -> None:
connection = MysqlConnection(
username="name",

View File

@ -28,8 +28,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
WorkflowConfig,
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_profiler_status
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -49,7 +49,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.execute()
workflow.raise_from_status()
print_profiler_status(workflow)
print_status(workflow)
workflow.stop()

View File

@ -71,8 +71,8 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
def mock_set_ingestion_pipeline_status(self, state):