From bd4071bd64afdc737a23148c77e66d1a2b4e3bc7 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 6 Apr 2022 17:05:00 +0200 Subject: [PATCH] Fix #3826 & #3886 - Profiler workflow & filter pattern (#3893) Fix #3826 & #3886 - Profiler workflow & filter pattern (#3893) --- .../src/metadata/ingestion/api/common.py | 39 ------- .../metadata/ingestion/source/sql_source.py | 24 +++- .../ingestion/source/sql_source_common.py | 100 ----------------- .../src/metadata/orm_profiler/api/workflow.py | 34 +++--- ingestion/src/metadata/utils/engines.py | 8 +- ingestion/src/metadata/utils/filters.py | 103 ++++++++++++++++++ ingestion/src/metadata/utils/helpers.py | 1 - .../tests/unit/profiler/test_workflow.py | 58 +++++----- 8 files changed, 178 insertions(+), 189 deletions(-) delete mode 100644 ingestion/src/metadata/ingestion/source/sql_source_common.py create mode 100644 ingestion/src/metadata/utils/filters.py diff --git a/ingestion/src/metadata/ingestion/api/common.py b/ingestion/src/metadata/ingestion/api/common.py index a6392ffff5a..38cb8d19d64 100644 --- a/ingestion/src/metadata/ingestion/api/common.py +++ b/ingestion/src/metadata/ingestion/api/common.py @@ -35,42 +35,3 @@ class DynamicTypedConfig(ConfigModel): class WorkflowExecutionError(Exception): """An error occurred when executing the workflow""" - - -class IncludeFilterPattern(ConfigModel): - """A class to store allow deny regexes""" - - includes: List[str] = [".*"] - excludes: List[str] = [] - alphabet: str = "[A-Za-z0-9 _.-]" - - @property - def alphabet_pattern(self): - return re.compile(f"^{self.alphabet}+$") - - @classmethod - def allow_all(cls): - return IncludeFilterPattern() - - def included(self, string: str) -> bool: - try: - for exclude in self.excludes: - if re.match(exclude, string): - return False - - for include in self.includes: - if re.match(include, string): - return True - return False - except Exception as err: - raise Exception("Regex Error: {}".format(err)) - - def is_fully_specified_include_list(self) -> bool: - for filter_pattern in self.includes: - if not self.alphabet_pattern.match(filter_pattern): - return False - return True - - def get_allowed_list(self): - assert self.is_fully_specified_include_list() - return [a for a in self.includes if self.included(a)] diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 5093f1ccb3b..08c5e22eb02 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -16,6 +16,7 @@ import logging import re import traceback import uuid +from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Iterable, List, Optional, Tuple @@ -56,7 +57,6 @@ from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import DeleteTable from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.sql_source_common import SQLSourceStatus from metadata.orm_profiler.orm.converter import ometa_to_orm from metadata.orm_profiler.profiler.default import DefaultProfiler from metadata.utils.column_type_parser import ColumnTypeParser @@ -66,6 +66,26 @@ from metadata.utils.helpers import get_database_service_or_create, ingest_lineag logger: logging.Logger = logging.getLogger(__name__) +@dataclass +class SQLSourceStatus(SourceStatus): + """ + Reports the source status after ingestion + """ + + success: List[str] = field(default_factory=list) + failures: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) + + def scanned(self, record: str) -> None: + self.success.append(record) + logger.info(f"Table Scanned: {record}") + + def filter(self, record: str, err: str) -> None: + self.filtered.append(record) + logger.warning(f"Filtered Table {record} due to {err}") + + def _get_table_description(schema: str, table: str, inspector: Inspector) -> str: description = None try: @@ -105,7 +125,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.service = get_database_service_or_create(config, metadata_config) self.metadata = OpenMetadata(metadata_config) self.status = SQLSourceStatus() - self.engine = get_engine(config=self.config) + self.engine = get_engine(workflow_source=self.config) self._session = None # We will instantiate this just if needed self.connection = self.engine.connect() self.data_profiler = None diff --git a/ingestion/src/metadata/ingestion/source/sql_source_common.py b/ingestion/src/metadata/ingestion/source/sql_source_common.py deleted file mode 100644 index 26e83377409..00000000000 --- a/ingestion/src/metadata/ingestion/source/sql_source_common.py +++ /dev/null @@ -1,100 +0,0 @@ -import logging -from abc import abstractmethod -from dataclasses import dataclass, field -from typing import List, Optional -from urllib.parse import quote_plus - -from pydantic import SecretStr - -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, -) -from metadata.generated.schema.operations.pipelines.databaseServiceMetadataPipeline import ( - DatabaseServiceMetadataPipeline, -) -from metadata.ingestion.api.source import SourceStatus - -logger: logging.Logger = logging.getLogger(__name__) - - -@dataclass -class SQLSourceStatus(SourceStatus): - """ - Reports the source status after ingestion - """ - - success: List[str] = field(default_factory=list) - failures: List[str] = field(default_factory=list) - warnings: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) - - def scanned(self, record: str) -> None: - self.success.append(record) - logger.info(f"Table Scanned: {record}") - - def filter(self, record: str, err: str) -> None: - self.filtered.append(record) - logger.warning(f"Filtered Table {record} due to {err}") - - -def wbuild_sql_source_connection_url( - host_port: str, - scheme: str, - username: Optional[str] = None, - password: Optional[SecretStr] = None, - database: Optional[str] = None, - options: Optional[dict] = None, -) -> str: - """ - Helper function to prepare the db URL - """ - - url = f"{scheme}://" - if username is not None: - url += f"{username}" - if password is not None: - url += f":{quote_plus(password.get_secret_value())}" - url += "@" - url += f"{host_port}" - if database: - url += f"/{database}" - - if options is not None: - if database is None: - url += "/" - params = "&".join( - f"{key}={quote_plus(value)}" for (key, value) in options.items() if value - ) - url = f"{url}?{params}" - return url - - -class SQLConnectionConfig(DatabaseServiceMetadataPipeline): - """ - Config class containing all supported - configurations for an SQL source, including - data profiling and DBT generated information. - """ - - service_name: str - db_schema: Optional[str] = None - options: dict = {} - connect_args: dict = {} - include_tables: Optional[bool] = True - - @abstractmethod - def get_connection_url(self): - return build_sql_source_connection_url( - host_port=self.hostPort, - scheme=self.scheme, - username=self.username, - password=self.password, - database=self.database, - options=self.options, - ) - - def get_service_type(self) -> DatabaseServiceType: - return DatabaseServiceType[self.type] - - def get_service_name(self) -> str: - return self.service_name diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 4e9e6e9ede4..0b1b284e8a7 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -26,6 +26,9 @@ from metadata.config.common import WorkflowExecutionError from metadata.config.workflow import get_ingestion_source, get_processor, get_sink from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, OpenMetadataWorkflowConfig, @@ -34,14 +37,11 @@ from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.sink import Sink from metadata.ingestion.api.source import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import ( - SQLConnectionConfig, - SQLSourceStatus, -) +from metadata.ingestion.source.sql_source import SQLSource, SQLSourceStatus from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.orm_profiler.utils import logger -from metadata.utils.engines import create_and_bind_session, get_engine +from metadata.utils.engines import create_and_bind_session +from metadata.utils.filters import filter_by_schema, filter_by_table logger = logger() @@ -64,7 +64,7 @@ class ProfilerWorkflow: ) # We will use the existing sources to build the Engine - self.source = get_ingestion_source( + self.source: Source = get_ingestion_source( source_type=self.config.source.type, source_config=self.config.source, metadata_config=self.metadata_config, @@ -76,7 +76,9 @@ class ProfilerWorkflow: ) # Init and type the source config - self.source_config: SQLConnectionConfig = self.source.config + self.source_config: DatabaseServiceMetadataPipeline = ( + self.config.source.sourceConfig.config + ) self.source_status = SQLSourceStatus() self.processor = get_processor( @@ -85,7 +87,7 @@ class ProfilerWorkflow: metadata_config=self.metadata_config, _from="orm_profiler", # Pass the session as kwargs for the profiler - session=create_and_bind_session(get_engine(self.source_config)), + session=create_and_bind_session(self.source.engine), ) if self.config.sink: @@ -121,20 +123,22 @@ class ProfilerWorkflow: for table in tables: # Validate schema - if not self.source_config.schema_filter_pattern.included( - table.database.name + if filter_by_schema( + schema_filter_pattern=self.source_config.schemaFilterPattern, + schema_name=table.databaseSchema.name, ): self.source_status.filter( - table.database.name, "Schema pattern not allowed" + table.databaseSchema.name, "Schema pattern not allowed" ) continue # Validate database - if not self.source_config.table_filter_pattern.included( - str(table.name.__root__) + if filter_by_table( + table_filter_pattern=self.source_config.tableFilterPattern, + table_name=str(table.name.__root__), ): self.source_status.filter( - table.fullyQualifiedName.__root__, "Table name pattern not allowed" + table.name.__root__, "Table name pattern not allowed" ) continue diff --git a/ingestion/src/metadata/utils/engines.py b/ingestion/src/metadata/utils/engines.py index 8706acb25c6..642a1daa31c 100644 --- a/ingestion/src/metadata/utils/engines.py +++ b/ingestion/src/metadata/utils/engines.py @@ -27,12 +27,12 @@ from metadata.utils.source_connections import get_connection_url logger = logging.getLogger("Utils") -def get_engine(config: WorkflowSource, verbose: bool = False) -> Engine: +def get_engine(workflow_source: WorkflowSource, verbose: bool = False) -> Engine: """ Given an SQL configuration, build the SQLAlchemy Engine """ - logger.info(f"Building Engine for {config.serviceName}...") - service_connection_config = config.serviceConnection.__root__.config + logger.info(f"Building Engine for {workflow_source.serviceName}...") + service_connection_config = workflow_source.serviceConnection.__root__.config options = service_connection_config.connectionOptions if not options: options = {} @@ -40,7 +40,7 @@ def get_engine(config: WorkflowSource, verbose: bool = False) -> Engine: if not connect_args: connect_args = {} engine = create_engine( - get_connection_url(config.serviceConnection.__root__.config), + get_connection_url(service_connection_config), **options, connect_args=connect_args, echo=verbose, diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py new file mode 100644 index 00000000000..c06f94e8cb5 --- /dev/null +++ b/ingestion/src/metadata/utils/filters.py @@ -0,0 +1,103 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Helper that implements table and filter pattern logic +""" +import re +from typing import List, Optional + +from metadata.generated.schema.type.filterPattern import FilterPatternModel + + +class InvalidPatternException(Exception): + """ + Raised when an invalid pattern is configured in the workflow + """ + + +def validate_regex(regex_list: List[str]) -> None: + """ + Check that the given include/exclude regexes + are well formatted + """ + for regex in regex_list: + try: + re.compile(regex) + except re.error: + raise InvalidPatternException(f"Invalid regex {regex}.") + + +def _filter(filter_pattern: Optional[FilterPatternModel], name: str) -> bool: + """ + Return True if the name needs to be filtered, False otherwise + + Include takes precedence over exclude + + :param filter_pattern: Model defining filtering logic + :param name: table or schema name + :return: True for filtering, False otherwise + """ + if not filter_pattern: + # No filter pattern, nothing to filter + return False + + if filter_pattern.includes: + validate_regex(filter_pattern.includes) + return not any( + [ + matched + for regex in filter_pattern.includes + if (matched := re.match(regex, name)) + ] + ) + + if filter_pattern.excludes: + validate_regex(filter_pattern.excludes) + return any( + [ + matched + for regex in filter_pattern.excludes + if (matched := re.match(regex, name)) + ] + ) + + return False + + +def filter_by_schema( + schema_filter_pattern: Optional[FilterPatternModel], schema_name: str +) -> bool: + """ + Return True if the schema needs to be filtered, False otherwise + + Include takes precedence over exclude + + :param schema_filter_pattern: Model defining schema filtering logic + :param schema_name: table schema name + :return: True for filtering, False otherwise + """ + return _filter(schema_filter_pattern, schema_name) + + +def filter_by_table( + table_filter_pattern: Optional[FilterPatternModel], table_name: str +) -> bool: + """ + Return True if the table needs to be filtered, False otherwise + + Include takes precedence over exclude + + :param table_filter_pattern: Model defining schema filtering logic + :param table_name: table name + :return: True for filtering, False otherwise + """ + return _filter(table_filter_pattern, table_name) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index ce9e3fe0589..56652523854 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -13,7 +13,6 @@ import logging import traceback from datetime import datetime, timedelta from typing import Any, Dict, Iterable -from urllib.parse import quote_plus from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createDashboardService import ( diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 72fbbde7eda..88ba591659f 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -21,11 +21,11 @@ from sqlalchemy.orm import declarative_base from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.entity.data.table import Column, DataType, Table -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataServerConfig, +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, ) from metadata.generated.schema.metadataIngestion.workflow import ( - Source as WorkflowSource, + OpenMetadataServerConfig, ) from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( ColumnValuesToBeBetween, @@ -46,10 +46,8 @@ config = { "source": { "type": "sqlite", "serviceName": "my_service", - "serviceConnection": { - "config": {"type": "SQLite", "hostPort": "", "database": ":memory:"} - }, - "sourceConfig": {}, + "serviceConnection": {"config": {"type": "SQLite"}}, + "sourceConfig": {"config": {}}, }, "processor": {"type": "orm-profiler", "config": {}}, "sink": {"type": "metadata-rest", "config": {}}, @@ -68,7 +66,7 @@ def test_init_workflow(): """ We can initialise the workflow from a config """ - assert isinstance(workflow.source_config, WorkflowSource) + assert isinstance(workflow.source_config, DatabaseServiceMetadataPipeline) assert isinstance(workflow.metadata_config, OpenMetadataServerConfig) assert isinstance(workflow.processor, OrmProfilerProcessor) @@ -83,29 +81,33 @@ def test_filter_entities(): """ service_name = "service" - db_reference1 = EntityReference(id=uuid.uuid4(), name="one_db", type="database") - db_reference2 = EntityReference(id=uuid.uuid4(), name="another_db", type="database") + schema_reference1 = EntityReference( + id=uuid.uuid4(), name="one_schema", type="databaseSchema" + ) + schema_reference2 = EntityReference( + id=uuid.uuid4(), name="another_schema", type="databaseSchema" + ) all_tables = [ Table( id=uuid.uuid4(), name="table1", - database=db_reference1, - fullyQualifiedName=f"{service_name}.{db_reference1.name}.table1", + databaseSchema=schema_reference1, + fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table1", columns=[Column(name="id", dataType=DataType.BIGINT)], ), Table( id=uuid.uuid4(), name="table2", - database=db_reference1, - fullyQualifiedName=f"{service_name}.{db_reference1.name}.table2", + databaseSchema=schema_reference1, + fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table2", columns=[Column(name="id", dataType=DataType.BIGINT)], ), Table( id=uuid.uuid4(), name="table3", - database=db_reference2, - fullyQualifiedName=f"{service_name}.{db_reference2.name}.table3", + databaseSchema=schema_reference2, + fullyQualifiedName=f"{service_name}.db.{schema_reference2.name}.table3", columns=[Column(name="id", dataType=DataType.BIGINT)], ), ] @@ -115,9 +117,9 @@ def test_filter_entities(): # We can exclude based on the schema name exclude_filter_schema_config = deepcopy(config) - exclude_filter_schema_config["source"]["config"]["schema_filter_pattern"] = { - "excludes": ["one_db"] - } + exclude_filter_schema_config["source"]["sourceConfig"]["config"][ + "schemaFilterPattern" + ] = {"excludes": ["one_schema"]} exclude_filter_schema_workflow = ProfilerWorkflow.create( exclude_filter_schema_config @@ -126,9 +128,9 @@ def test_filter_entities(): # We can include based on the schema name include_filter_schema_config = deepcopy(config) - include_filter_schema_config["source"]["config"]["schema_filter_pattern"] = { - "includes": ["another_db"] - } + include_filter_schema_config["source"]["sourceConfig"]["config"][ + "schemaFilterPattern" + ] = {"includes": ["another_schema"]} include_filter_schema_workflow = ProfilerWorkflow.create( include_filter_schema_config @@ -137,18 +139,18 @@ def test_filter_entities(): # We can exclude based on the table name exclude_filter_table_config = deepcopy(config) - exclude_filter_table_config["source"]["config"]["table_filter_pattern"] = { - "excludes": ["tab*"] - } + exclude_filter_table_config["source"]["sourceConfig"]["config"][ + "tableFilterPattern" + ] = {"excludes": ["tab*"]} exclude_filter_table_workflow = ProfilerWorkflow.create(exclude_filter_table_config) assert len(list(exclude_filter_table_workflow.filter_entities(all_tables))) == 0 # We can include based on the table name include_filter_table_config = deepcopy(config) - include_filter_table_config["source"]["config"]["table_filter_pattern"] = { - "includes": ["table1"] - } + include_filter_table_config["source"]["sourceConfig"]["config"][ + "tableFilterPattern" + ] = {"includes": ["table1"]} include_filter_table_workflow = ProfilerWorkflow.create(include_filter_table_config) assert len(list(include_filter_table_workflow.filter_entities(all_tables))) == 1