Fix #3826 & #3886 - Profiler workflow & filter pattern (#3893)

Fix #3826 & #3886 - Profiler workflow & filter pattern (#3893)
This commit is contained in:
Pere Miquel Brull 2022-04-06 17:05:00 +02:00 committed by GitHub
parent 7292695bd3
commit bd4071bd64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 178 additions and 189 deletions

View File

@ -35,42 +35,3 @@ class DynamicTypedConfig(ConfigModel):
class WorkflowExecutionError(Exception):
"""An error occurred when executing the workflow"""
class IncludeFilterPattern(ConfigModel):
"""A class to store allow deny regexes"""
includes: List[str] = [".*"]
excludes: List[str] = []
alphabet: str = "[A-Za-z0-9 _.-]"
@property
def alphabet_pattern(self):
return re.compile(f"^{self.alphabet}+$")
@classmethod
def allow_all(cls):
return IncludeFilterPattern()
def included(self, string: str) -> bool:
try:
for exclude in self.excludes:
if re.match(exclude, string):
return False
for include in self.includes:
if re.match(include, string):
return True
return False
except Exception as err:
raise Exception("Regex Error: {}".format(err))
def is_fully_specified_include_list(self) -> bool:
for filter_pattern in self.includes:
if not self.alphabet_pattern.match(filter_pattern):
return False
return True
def get_allowed_list(self):
assert self.is_fully_specified_include_list()
return [a for a in self.includes if self.included(a)]

View File

@ -16,6 +16,7 @@ import logging
import re
import traceback
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple
@ -56,7 +57,6 @@ from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import DeleteTable
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source_common import SQLSourceStatus
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiler.default import DefaultProfiler
from metadata.utils.column_type_parser import ColumnTypeParser
@ -66,6 +66,26 @@ from metadata.utils.helpers import get_database_service_or_create, ingest_lineag
logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class SQLSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Table Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Filtered Table {record} due to {err}")
def _get_table_description(schema: str, table: str, inspector: Inspector) -> str:
description = None
try:
@ -105,7 +125,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.service = get_database_service_or_create(config, metadata_config)
self.metadata = OpenMetadata(metadata_config)
self.status = SQLSourceStatus()
self.engine = get_engine(config=self.config)
self.engine = get_engine(workflow_source=self.config)
self._session = None # We will instantiate this just if needed
self.connection = self.engine.connect()
self.data_profiler = None

View File

@ -1,100 +0,0 @@
import logging
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import List, Optional
from urllib.parse import quote_plus
from pydantic import SecretStr
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.operations.pipelines.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.ingestion.api.source import SourceStatus
logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class SQLSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Table Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Filtered Table {record} due to {err}")
def wbuild_sql_source_connection_url(
host_port: str,
scheme: str,
username: Optional[str] = None,
password: Optional[SecretStr] = None,
database: Optional[str] = None,
options: Optional[dict] = None,
) -> str:
"""
Helper function to prepare the db URL
"""
url = f"{scheme}://"
if username is not None:
url += f"{username}"
if password is not None:
url += f":{quote_plus(password.get_secret_value())}"
url += "@"
url += f"{host_port}"
if database:
url += f"/{database}"
if options is not None:
if database is None:
url += "/"
params = "&".join(
f"{key}={quote_plus(value)}" for (key, value) in options.items() if value
)
url = f"{url}?{params}"
return url
class SQLConnectionConfig(DatabaseServiceMetadataPipeline):
"""
Config class containing all supported
configurations for an SQL source, including
data profiling and DBT generated information.
"""
service_name: str
db_schema: Optional[str] = None
options: dict = {}
connect_args: dict = {}
include_tables: Optional[bool] = True
@abstractmethod
def get_connection_url(self):
return build_sql_source_connection_url(
host_port=self.hostPort,
scheme=self.scheme,
username=self.username,
password=self.password,
database=self.database,
options=self.options,
)
def get_service_type(self) -> DatabaseServiceType:
return DatabaseServiceType[self.type]
def get_service_name(self) -> str:
return self.service_name

View File

@ -26,6 +26,9 @@ from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_ingestion_source, get_processor, get_sink
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
OpenMetadataWorkflowConfig,
@ -34,14 +37,11 @@ from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import (
SQLConnectionConfig,
SQLSourceStatus,
)
from metadata.ingestion.source.sql_source import SQLSource, SQLSourceStatus
from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.orm_profiler.utils import logger
from metadata.utils.engines import create_and_bind_session, get_engine
from metadata.utils.engines import create_and_bind_session
from metadata.utils.filters import filter_by_schema, filter_by_table
logger = logger()
@ -64,7 +64,7 @@ class ProfilerWorkflow:
)
# We will use the existing sources to build the Engine
self.source = get_ingestion_source(
self.source: Source = get_ingestion_source(
source_type=self.config.source.type,
source_config=self.config.source,
metadata_config=self.metadata_config,
@ -76,7 +76,9 @@ class ProfilerWorkflow:
)
# Init and type the source config
self.source_config: SQLConnectionConfig = self.source.config
self.source_config: DatabaseServiceMetadataPipeline = (
self.config.source.sourceConfig.config
)
self.source_status = SQLSourceStatus()
self.processor = get_processor(
@ -85,7 +87,7 @@ class ProfilerWorkflow:
metadata_config=self.metadata_config,
_from="orm_profiler",
# Pass the session as kwargs for the profiler
session=create_and_bind_session(get_engine(self.source_config)),
session=create_and_bind_session(self.source.engine),
)
if self.config.sink:
@ -121,20 +123,22 @@ class ProfilerWorkflow:
for table in tables:
# Validate schema
if not self.source_config.schema_filter_pattern.included(
table.database.name
if filter_by_schema(
schema_filter_pattern=self.source_config.schemaFilterPattern,
schema_name=table.databaseSchema.name,
):
self.source_status.filter(
table.database.name, "Schema pattern not allowed"
table.databaseSchema.name, "Schema pattern not allowed"
)
continue
# Validate database
if not self.source_config.table_filter_pattern.included(
str(table.name.__root__)
if filter_by_table(
table_filter_pattern=self.source_config.tableFilterPattern,
table_name=str(table.name.__root__),
):
self.source_status.filter(
table.fullyQualifiedName.__root__, "Table name pattern not allowed"
table.name.__root__, "Table name pattern not allowed"
)
continue

View File

@ -27,12 +27,12 @@ from metadata.utils.source_connections import get_connection_url
logger = logging.getLogger("Utils")
def get_engine(config: WorkflowSource, verbose: bool = False) -> Engine:
def get_engine(workflow_source: WorkflowSource, verbose: bool = False) -> Engine:
"""
Given an SQL configuration, build the SQLAlchemy Engine
"""
logger.info(f"Building Engine for {config.serviceName}...")
service_connection_config = config.serviceConnection.__root__.config
logger.info(f"Building Engine for {workflow_source.serviceName}...")
service_connection_config = workflow_source.serviceConnection.__root__.config
options = service_connection_config.connectionOptions
if not options:
options = {}
@ -40,7 +40,7 @@ def get_engine(config: WorkflowSource, verbose: bool = False) -> Engine:
if not connect_args:
connect_args = {}
engine = create_engine(
get_connection_url(config.serviceConnection.__root__.config),
get_connection_url(service_connection_config),
**options,
connect_args=connect_args,
echo=verbose,

View File

@ -0,0 +1,103 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper that implements table and filter pattern logic
"""
import re
from typing import List, Optional
from metadata.generated.schema.type.filterPattern import FilterPatternModel
class InvalidPatternException(Exception):
"""
Raised when an invalid pattern is configured in the workflow
"""
def validate_regex(regex_list: List[str]) -> None:
"""
Check that the given include/exclude regexes
are well formatted
"""
for regex in regex_list:
try:
re.compile(regex)
except re.error:
raise InvalidPatternException(f"Invalid regex {regex}.")
def _filter(filter_pattern: Optional[FilterPatternModel], name: str) -> bool:
"""
Return True if the name needs to be filtered, False otherwise
Include takes precedence over exclude
:param filter_pattern: Model defining filtering logic
:param name: table or schema name
:return: True for filtering, False otherwise
"""
if not filter_pattern:
# No filter pattern, nothing to filter
return False
if filter_pattern.includes:
validate_regex(filter_pattern.includes)
return not any(
[
matched
for regex in filter_pattern.includes
if (matched := re.match(regex, name))
]
)
if filter_pattern.excludes:
validate_regex(filter_pattern.excludes)
return any(
[
matched
for regex in filter_pattern.excludes
if (matched := re.match(regex, name))
]
)
return False
def filter_by_schema(
schema_filter_pattern: Optional[FilterPatternModel], schema_name: str
) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Include takes precedence over exclude
:param schema_filter_pattern: Model defining schema filtering logic
:param schema_name: table schema name
:return: True for filtering, False otherwise
"""
return _filter(schema_filter_pattern, schema_name)
def filter_by_table(
table_filter_pattern: Optional[FilterPatternModel], table_name: str
) -> bool:
"""
Return True if the table needs to be filtered, False otherwise
Include takes precedence over exclude
:param table_filter_pattern: Model defining schema filtering logic
:param table_name: table name
:return: True for filtering, False otherwise
"""
return _filter(table_filter_pattern, table_name)

View File

@ -13,7 +13,6 @@ import logging
import traceback
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable
from urllib.parse import quote_plus
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createDashboardService import (

View File

@ -21,11 +21,11 @@ from sqlalchemy.orm import declarative_base
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
OpenMetadataServerConfig,
)
from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween,
@ -46,10 +46,8 @@ config = {
"source": {
"type": "sqlite",
"serviceName": "my_service",
"serviceConnection": {
"config": {"type": "SQLite", "hostPort": "", "database": ":memory:"}
},
"sourceConfig": {},
"serviceConnection": {"config": {"type": "SQLite"}},
"sourceConfig": {"config": {}},
},
"processor": {"type": "orm-profiler", "config": {}},
"sink": {"type": "metadata-rest", "config": {}},
@ -68,7 +66,7 @@ def test_init_workflow():
"""
We can initialise the workflow from a config
"""
assert isinstance(workflow.source_config, WorkflowSource)
assert isinstance(workflow.source_config, DatabaseServiceMetadataPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataServerConfig)
assert isinstance(workflow.processor, OrmProfilerProcessor)
@ -83,29 +81,33 @@ def test_filter_entities():
"""
service_name = "service"
db_reference1 = EntityReference(id=uuid.uuid4(), name="one_db", type="database")
db_reference2 = EntityReference(id=uuid.uuid4(), name="another_db", type="database")
schema_reference1 = EntityReference(
id=uuid.uuid4(), name="one_schema", type="databaseSchema"
)
schema_reference2 = EntityReference(
id=uuid.uuid4(), name="another_schema", type="databaseSchema"
)
all_tables = [
Table(
id=uuid.uuid4(),
name="table1",
database=db_reference1,
fullyQualifiedName=f"{service_name}.{db_reference1.name}.table1",
databaseSchema=schema_reference1,
fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table1",
columns=[Column(name="id", dataType=DataType.BIGINT)],
),
Table(
id=uuid.uuid4(),
name="table2",
database=db_reference1,
fullyQualifiedName=f"{service_name}.{db_reference1.name}.table2",
databaseSchema=schema_reference1,
fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table2",
columns=[Column(name="id", dataType=DataType.BIGINT)],
),
Table(
id=uuid.uuid4(),
name="table3",
database=db_reference2,
fullyQualifiedName=f"{service_name}.{db_reference2.name}.table3",
databaseSchema=schema_reference2,
fullyQualifiedName=f"{service_name}.db.{schema_reference2.name}.table3",
columns=[Column(name="id", dataType=DataType.BIGINT)],
),
]
@ -115,9 +117,9 @@ def test_filter_entities():
# We can exclude based on the schema name
exclude_filter_schema_config = deepcopy(config)
exclude_filter_schema_config["source"]["config"]["schema_filter_pattern"] = {
"excludes": ["one_db"]
}
exclude_filter_schema_config["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = {"excludes": ["one_schema"]}
exclude_filter_schema_workflow = ProfilerWorkflow.create(
exclude_filter_schema_config
@ -126,9 +128,9 @@ def test_filter_entities():
# We can include based on the schema name
include_filter_schema_config = deepcopy(config)
include_filter_schema_config["source"]["config"]["schema_filter_pattern"] = {
"includes": ["another_db"]
}
include_filter_schema_config["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = {"includes": ["another_schema"]}
include_filter_schema_workflow = ProfilerWorkflow.create(
include_filter_schema_config
@ -137,18 +139,18 @@ def test_filter_entities():
# We can exclude based on the table name
exclude_filter_table_config = deepcopy(config)
exclude_filter_table_config["source"]["config"]["table_filter_pattern"] = {
"excludes": ["tab*"]
}
exclude_filter_table_config["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = {"excludes": ["tab*"]}
exclude_filter_table_workflow = ProfilerWorkflow.create(exclude_filter_table_config)
assert len(list(exclude_filter_table_workflow.filter_entities(all_tables))) == 0
# We can include based on the table name
include_filter_table_config = deepcopy(config)
include_filter_table_config["source"]["config"]["table_filter_pattern"] = {
"includes": ["table1"]
}
include_filter_table_config["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = {"includes": ["table1"]}
include_filter_table_workflow = ProfilerWorkflow.create(include_filter_table_config)
assert len(list(include_filter_table_workflow.filter_entities(all_tables))) == 1