2022-02-20 17:55:12 +01:00
|
|
|
# Copyright 2021 Collate
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
"""
|
|
|
|
Validate workflow configs and filters
|
|
|
|
"""
|
|
|
|
import uuid
|
|
|
|
from copy import deepcopy
|
2022-07-13 14:43:48 +02:00
|
|
|
from unittest.mock import patch
|
2022-02-20 17:55:12 +01:00
|
|
|
|
2022-02-22 08:09:02 +01:00
|
|
|
import sqlalchemy as sqa
|
2022-07-13 14:43:48 +02:00
|
|
|
from pytest import raises
|
2022-02-22 08:09:02 +01:00
|
|
|
from sqlalchemy.orm import declarative_base
|
|
|
|
|
2022-08-04 07:22:47 -07:00
|
|
|
from metadata.generated.schema.entity.data.table import (
|
|
|
|
Column,
|
|
|
|
DataType,
|
|
|
|
Table,
|
|
|
|
TableProfilerConfig,
|
|
|
|
)
|
2022-04-12 23:40:21 -07:00
|
|
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
|
|
|
OpenMetadataConnection,
|
|
|
|
)
|
2022-04-21 17:53:29 +02:00
|
|
|
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
|
|
|
|
DatabaseServiceProfilerPipeline,
|
2022-04-05 14:32:45 +02:00
|
|
|
)
|
2022-02-20 17:55:12 +01:00
|
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
|
|
|
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
|
2022-02-22 08:09:02 +01:00
|
|
|
from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor
|
2022-03-30 08:54:27 +02:00
|
|
|
from metadata.orm_profiler.profiler.default import DefaultProfiler
|
|
|
|
from metadata.orm_profiler.profiler.models import ProfilerDef
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
config = {
|
2022-04-05 14:32:45 +02:00
|
|
|
"source": {
|
|
|
|
"type": "sqlite",
|
|
|
|
"serviceName": "my_service",
|
2022-04-06 17:05:00 +02:00
|
|
|
"serviceConnection": {"config": {"type": "SQLite"}},
|
2022-04-21 17:53:29 +02:00
|
|
|
"sourceConfig": {"config": {"type": "Profiler"}},
|
2022-04-05 14:32:45 +02:00
|
|
|
},
|
2022-02-22 08:09:02 +01:00
|
|
|
"processor": {"type": "orm-profiler", "config": {}},
|
2022-02-20 17:55:12 +01:00
|
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
2022-04-05 14:32:45 +02:00
|
|
|
"workflowConfig": {
|
|
|
|
"openMetadataServerConfig": {
|
|
|
|
"hostPort": "http://localhost:8585/api",
|
|
|
|
"authProvider": "no-auth",
|
|
|
|
}
|
2022-02-20 17:55:12 +01:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-07-13 14:43:48 +02:00
|
|
|
@patch.object(
|
|
|
|
ProfilerWorkflow,
|
|
|
|
"_validate_service_name",
|
|
|
|
return_value=True,
|
|
|
|
)
|
|
|
|
def test_init_workflow(mocked_method):
|
2022-02-20 17:55:12 +01:00
|
|
|
"""
|
|
|
|
We can initialise the workflow from a config
|
|
|
|
"""
|
2022-07-13 14:43:48 +02:00
|
|
|
workflow = ProfilerWorkflow.create(config)
|
|
|
|
mocked_method.assert_called()
|
|
|
|
|
2022-04-21 17:53:29 +02:00
|
|
|
assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline)
|
2022-04-12 23:40:21 -07:00
|
|
|
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
|
2022-02-22 08:09:02 +01:00
|
|
|
|
2022-06-14 21:37:44 +02:00
|
|
|
workflow.create_processor(workflow.config.source.serviceConnection.__root__.config)
|
|
|
|
|
2022-02-22 08:09:02 +01:00
|
|
|
assert isinstance(workflow.processor, OrmProfilerProcessor)
|
|
|
|
assert workflow.processor.config.profiler is None
|
2022-08-04 07:22:47 -07:00
|
|
|
assert workflow.processor.config.testSuites is None
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
|
2022-07-13 14:43:48 +02:00
|
|
|
@patch.object(
|
|
|
|
ProfilerWorkflow,
|
|
|
|
"_validate_service_name",
|
|
|
|
return_value=True,
|
|
|
|
)
|
|
|
|
def test_filter_entities(mocked_method):
|
2022-02-20 17:55:12 +01:00
|
|
|
"""
|
|
|
|
We can properly filter entities depending on the
|
|
|
|
workflow configuration
|
|
|
|
"""
|
2022-07-13 14:43:48 +02:00
|
|
|
workflow = ProfilerWorkflow.create(config)
|
|
|
|
mocked_method.assert_called()
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
service_name = "service"
|
2022-04-06 17:05:00 +02:00
|
|
|
schema_reference1 = EntityReference(
|
|
|
|
id=uuid.uuid4(), name="one_schema", type="databaseSchema"
|
|
|
|
)
|
|
|
|
schema_reference2 = EntityReference(
|
|
|
|
id=uuid.uuid4(), name="another_schema", type="databaseSchema"
|
|
|
|
)
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
all_tables = [
|
|
|
|
Table(
|
|
|
|
id=uuid.uuid4(),
|
|
|
|
name="table1",
|
2022-04-06 17:05:00 +02:00
|
|
|
databaseSchema=schema_reference1,
|
|
|
|
fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table1",
|
2022-02-20 17:55:12 +01:00
|
|
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
|
|
|
),
|
|
|
|
Table(
|
|
|
|
id=uuid.uuid4(),
|
|
|
|
name="table2",
|
2022-04-06 17:05:00 +02:00
|
|
|
databaseSchema=schema_reference1,
|
|
|
|
fullyQualifiedName=f"{service_name}.db.{schema_reference1.name}.table2",
|
2022-02-20 17:55:12 +01:00
|
|
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
|
|
|
),
|
|
|
|
Table(
|
|
|
|
id=uuid.uuid4(),
|
|
|
|
name="table3",
|
2022-04-06 17:05:00 +02:00
|
|
|
databaseSchema=schema_reference2,
|
|
|
|
fullyQualifiedName=f"{service_name}.db.{schema_reference2.name}.table3",
|
2022-02-20 17:55:12 +01:00
|
|
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
|
|
|
),
|
|
|
|
]
|
|
|
|
|
|
|
|
# Simple workflow does not filter
|
|
|
|
assert len(list(workflow.filter_entities(all_tables))) == 3
|
|
|
|
|
|
|
|
# We can exclude based on the schema name
|
2022-04-21 17:53:29 +02:00
|
|
|
exclude_config = deepcopy(config)
|
2022-08-08 10:43:17 +05:30
|
|
|
exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
|
|
|
|
"excludes": ["another_schema"]
|
2022-04-21 17:53:29 +02:00
|
|
|
}
|
2022-02-20 17:55:12 +01:00
|
|
|
|
2022-04-21 17:53:29 +02:00
|
|
|
exclude_workflow = ProfilerWorkflow.create(exclude_config)
|
2022-07-13 14:43:48 +02:00
|
|
|
mocked_method.assert_called()
|
2022-08-08 10:43:17 +05:30
|
|
|
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
|
2022-02-20 17:55:12 +01:00
|
|
|
|
2022-04-21 17:53:29 +02:00
|
|
|
exclude_config = deepcopy(config)
|
2022-08-08 10:43:17 +05:30
|
|
|
exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
|
|
|
|
"excludes": ["another*"]
|
2022-04-21 17:53:29 +02:00
|
|
|
}
|
2022-02-20 17:55:12 +01:00
|
|
|
|
2022-04-21 17:53:29 +02:00
|
|
|
exclude_workflow = ProfilerWorkflow.create(exclude_config)
|
2022-07-13 14:43:48 +02:00
|
|
|
mocked_method.assert_called()
|
2022-04-21 17:53:29 +02:00
|
|
|
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
|
2022-02-20 17:55:12 +01:00
|
|
|
|
2022-04-21 17:53:29 +02:00
|
|
|
include_config = deepcopy(config)
|
2022-08-08 10:43:17 +05:30
|
|
|
include_config["source"]["sourceConfig"]["config"]["databaseFilterPattern"] = {
|
|
|
|
"includes": ["db*"]
|
2022-04-21 17:53:29 +02:00
|
|
|
}
|
2022-02-20 17:55:12 +01:00
|
|
|
|
2022-04-21 17:53:29 +02:00
|
|
|
include_workflow = ProfilerWorkflow.create(include_config)
|
2022-07-13 14:43:48 +02:00
|
|
|
mocked_method.assert_called()
|
2022-04-21 17:53:29 +02:00
|
|
|
assert len(list(include_workflow.filter_entities(all_tables))) == 3
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
|
2022-07-13 14:43:48 +02:00
|
|
|
@patch.object(
|
|
|
|
ProfilerWorkflow,
|
|
|
|
"_validate_service_name",
|
|
|
|
return_value=True,
|
|
|
|
)
|
|
|
|
def test_profile_def(mocked_method):
|
2022-02-20 17:55:12 +01:00
|
|
|
"""
|
|
|
|
Validate the definitions of the profile in the JSON
|
|
|
|
"""
|
|
|
|
profile_config = deepcopy(config)
|
2022-02-22 08:09:02 +01:00
|
|
|
profile_config["processor"]["config"]["profiler"] = {
|
2022-02-20 17:55:12 +01:00
|
|
|
"name": "my_profiler",
|
2022-02-25 18:26:30 +01:00
|
|
|
"metrics": ["row_count", "min", "COUNT", "null_count"],
|
2022-02-20 17:55:12 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
profile_workflow = ProfilerWorkflow.create(profile_config)
|
2022-07-13 14:43:48 +02:00
|
|
|
mocked_method.assert_called()
|
2022-06-14 21:37:44 +02:00
|
|
|
profile_workflow.create_processor(
|
|
|
|
profile_workflow.config.source.serviceConnection.__root__.config
|
|
|
|
)
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
profile_definition = ProfilerDef(
|
|
|
|
name="my_profiler",
|
2022-02-25 18:26:30 +01:00
|
|
|
metrics=["ROW_COUNT", "MIN", "COUNT", "NULL_COUNT"],
|
2022-02-20 17:55:12 +01:00
|
|
|
time_metrics=None,
|
|
|
|
custom_metrics=None,
|
|
|
|
)
|
|
|
|
|
2022-02-22 08:09:02 +01:00
|
|
|
assert isinstance(profile_workflow.processor, OrmProfilerProcessor)
|
|
|
|
assert profile_workflow.processor.config.profiler == profile_definition
|
|
|
|
|
|
|
|
|
2022-07-13 14:43:48 +02:00
|
|
|
@patch.object(
|
|
|
|
ProfilerWorkflow,
|
|
|
|
"_validate_service_name",
|
|
|
|
return_value=True,
|
|
|
|
)
|
|
|
|
def test_default_profile_def(mocked_method):
|
2022-02-22 08:09:02 +01:00
|
|
|
"""
|
|
|
|
If no information is specified for the profiler, let's
|
|
|
|
use the SimpleTableProfiler and SimpleProfiler
|
|
|
|
"""
|
|
|
|
|
|
|
|
profile_workflow = ProfilerWorkflow.create(config)
|
2022-07-13 14:43:48 +02:00
|
|
|
mocked_method.assert_called()
|
2022-06-14 21:37:44 +02:00
|
|
|
profile_workflow.create_processor(
|
|
|
|
profile_workflow.config.source.serviceConnection.__root__.config
|
|
|
|
)
|
2022-02-22 08:09:02 +01:00
|
|
|
|
|
|
|
assert isinstance(profile_workflow.processor, OrmProfilerProcessor)
|
|
|
|
assert profile_workflow.processor.config.profiler is None
|
|
|
|
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
|
|
class User(Base):
|
|
|
|
__tablename__ = "users"
|
|
|
|
id = sqa.Column(sqa.Integer, primary_key=True)
|
|
|
|
name = sqa.Column(sqa.String(256))
|
|
|
|
fullname = sqa.Column(sqa.String(256))
|
|
|
|
nickname = sqa.Column(sqa.String(256))
|
|
|
|
age = sqa.Column(sqa.Integer)
|
|
|
|
|
2022-03-16 06:05:59 +01:00
|
|
|
table = Table(
|
|
|
|
id=uuid.uuid4(),
|
|
|
|
name="users",
|
|
|
|
fullyQualifiedName="service.db.users",
|
|
|
|
columns=[
|
|
|
|
Column(name="id", dataType=DataType.INT),
|
|
|
|
Column(name="name", dataType=DataType.STRING),
|
|
|
|
Column(name="fullname", dataType=DataType.STRING),
|
|
|
|
Column(name="nickname", dataType=DataType.STRING),
|
|
|
|
Column(name="age", dataType=DataType.INT),
|
|
|
|
],
|
|
|
|
database=EntityReference(id=uuid.uuid4(), name="db", type="database"),
|
2022-08-04 07:22:47 -07:00
|
|
|
tableProfilerConfig=TableProfilerConfig(
|
|
|
|
profilerCo=80.0,
|
|
|
|
),
|
2022-03-16 06:05:59 +01:00
|
|
|
)
|
|
|
|
|
2022-02-22 08:09:02 +01:00
|
|
|
assert isinstance(
|
2022-03-16 06:05:59 +01:00
|
|
|
profile_workflow.processor.build_profiler(User, table=table),
|
2022-02-25 18:26:30 +01:00
|
|
|
DefaultProfiler,
|
2022-02-22 08:09:02 +01:00
|
|
|
)
|
2022-02-20 17:55:12 +01:00
|
|
|
|
|
|
|
|
2022-08-04 07:22:47 -07:00
|
|
|
# Disabled this test as we'll need to change it entirely
|
|
|
|
# should be fixed in https://github.com/open-metadata/OpenMetadata/issues/5661
|
|
|
|
# @patch.object(
|
|
|
|
# ProfilerWorkflow,
|
|
|
|
# "_validate_service_name",
|
|
|
|
# return_value=True,
|
|
|
|
# )
|
|
|
|
# def test_tests_def(mocked_method):
|
|
|
|
# """
|
|
|
|
# Validate the test case definition
|
|
|
|
# """
|
|
|
|
# test_config = deepcopy(config)
|
|
|
|
# test_config["processor"]["config"]["test_suite"] = {
|
|
|
|
# "name": "My Test Suite",
|
|
|
|
# "tests": [
|
|
|
|
# {
|
|
|
|
# "table": "service.db.name", # FQDN
|
|
|
|
# "table_tests": [
|
|
|
|
# {
|
|
|
|
# "testCase": {
|
|
|
|
# "config": {
|
|
|
|
# "value": 100,
|
|
|
|
# },
|
|
|
|
# "tableTestType": "tableRowCountToEqual",
|
|
|
|
# },
|
|
|
|
# },
|
|
|
|
# ],
|
|
|
|
# "column_tests": [
|
|
|
|
# {
|
|
|
|
# "columnName": "age",
|
|
|
|
# "testCase": {
|
|
|
|
# "config": {
|
|
|
|
# "minValue": 0,
|
|
|
|
# "maxValue": 99,
|
|
|
|
# },
|
|
|
|
# "columnTestType": "columnValuesToBeBetween",
|
|
|
|
# },
|
|
|
|
# }
|
|
|
|
# ],
|
|
|
|
# },
|
|
|
|
# ],
|
|
|
|
# }
|
|
|
|
|
|
|
|
# test_workflow = ProfilerWorkflow.create(test_config)
|
|
|
|
# mocked_method.assert_called()
|
|
|
|
# test_workflow.create_processor(
|
|
|
|
# test_workflow.config.source.serviceConnection.__root__.config
|
|
|
|
# )
|
|
|
|
|
|
|
|
# assert isinstance(test_workflow.processor, OrmProfilerProcessor)
|
|
|
|
# suite = test_workflow.processor.config.test_suite
|
|
|
|
|
|
|
|
# expected = TestSuite(
|
|
|
|
# name="My Test Suite",
|
|
|
|
# tests=[
|
|
|
|
# TestDef(
|
|
|
|
# table="service.db.name",
|
|
|
|
# table_tests=[
|
|
|
|
# CreateTableTestRequest(
|
|
|
|
# testCase=TableTestCase(
|
|
|
|
# config=TableRowCountToEqual(value=100),
|
|
|
|
# tableTestType=TableTestType.tableRowCountToEqual,
|
|
|
|
# ),
|
|
|
|
# )
|
|
|
|
# ],
|
|
|
|
# column_tests=[
|
|
|
|
# CreateColumnTestRequest(
|
|
|
|
# columnName="age",
|
|
|
|
# testCase=ColumnTestCase(
|
|
|
|
# config=ColumnValuesToBeBetween(minValue=0, maxValue=99),
|
|
|
|
# columnTestType=ColumnTestType.columnValuesToBeBetween,
|
|
|
|
# ),
|
|
|
|
# )
|
|
|
|
# ],
|
|
|
|
# )
|
|
|
|
# ],
|
|
|
|
# )
|
|
|
|
|
|
|
|
# assert suite == expected
|
2022-07-13 14:43:48 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_service_name_validation_raised():
|
|
|
|
"""Test the service name validation for the profiler
|
|
|
|
workflow is raised correctly
|
|
|
|
"""
|
|
|
|
with raises(ValueError, match="Service name `.*` does not exist"):
|
|
|
|
ProfilerWorkflow.create(config)
|