mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-14 12:38:45 +00:00
183 lines
7.5 KiB
Python
183 lines
7.5 KiB
Python
# Copyright 2024 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Test the SQL profiler using a Postgres and a MySQL container.
|
|
We load a simple user table in each service and run the profiler on it.
|
|
To run this we need OpenMetadata server up and running.
|
|
No sample data is required beforehand
|
|
"""
|
|
|
|
import json
|
|
from typing import List
|
|
from unittest import TestCase
|
|
|
|
from _openmetadata_testutils.ometa import int_admin_ometa
|
|
from metadata.generated.schema.configuration.profilerConfiguration import (
|
|
MetricConfigurationDefinition,
|
|
MetricType,
|
|
ProfilerConfiguration,
|
|
)
|
|
from metadata.generated.schema.entity.data.table import DataType, Table
|
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
from metadata.generated.schema.settings.settings import Settings, SettingType
|
|
from metadata.workflow.metadata import MetadataWorkflow
|
|
from metadata.workflow.profiler import ProfilerWorkflow
|
|
|
|
from ...utils.docker_service_builders.test_container_builder import ContainerBuilder
|
|
from ..integration_base import (
|
|
METADATA_INGESTION_CONFIG_TEMPLATE,
|
|
PROFILER_INGESTION_CONFIG_TEMPLATE,
|
|
)
|
|
|
|
|
|
class TestSQAProfiler(TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.metadata = int_admin_ometa()
|
|
cls.container_builder = ContainerBuilder()
|
|
try:
|
|
cls.container_builder.run_postgres_container()
|
|
cls.container_builder.run_mysql_container()
|
|
|
|
for container in cls.container_builder.containers:
|
|
container_config = container.get_config()
|
|
source_config = container.get_source_config()
|
|
config = METADATA_INGESTION_CONFIG_TEMPLATE.format(
|
|
type=container.connector_type,
|
|
service_name=type(container).__name__,
|
|
service_config=container_config,
|
|
source_config=source_config,
|
|
)
|
|
ingestion_workflow = MetadataWorkflow.create(
|
|
json.loads(config),
|
|
)
|
|
ingestion_workflow.execute()
|
|
ingestion_workflow.raise_from_status()
|
|
ingestion_workflow.stop()
|
|
except Exception as e:
|
|
cls.container_builder.stop_all_containers()
|
|
raise e
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.container_builder.stop_all_containers()
|
|
db_entities = []
|
|
for container in cls.container_builder.containers:
|
|
db_entities.append(
|
|
cls.metadata.get_by_name(DatabaseService, type(container).__name__)
|
|
)
|
|
for db_entity in db_entities:
|
|
cls.metadata.delete(DatabaseService, db_entity.id, True, True)
|
|
cls._clean_up_settings()
|
|
|
|
@classmethod
|
|
def _clean_up_settings(cls):
|
|
"""Reset profiler settings"""
|
|
profiler_configuration = ProfilerConfiguration(metricConfiguration=[])
|
|
|
|
settings = Settings(
|
|
config_type=SettingType.profilerConfiguration,
|
|
config_value=profiler_configuration,
|
|
)
|
|
cls.metadata.create_or_update_settings(settings)
|
|
|
|
def test_profiler_workflow(self):
|
|
"""test a simple profiler workflow on a table in each service and validate the profile is created"""
|
|
for container in self.container_builder.containers:
|
|
try:
|
|
config = PROFILER_INGESTION_CONFIG_TEMPLATE.format(
|
|
type=container.connector_type,
|
|
service_config=container.get_config(),
|
|
service_name=type(container).__name__,
|
|
)
|
|
profiler_workflow = ProfilerWorkflow.create(
|
|
json.loads(config),
|
|
)
|
|
profiler_workflow.execute()
|
|
profiler_workflow.print_status()
|
|
profiler_workflow.raise_from_status()
|
|
profiler_workflow.stop()
|
|
except Exception as e:
|
|
self.fail(
|
|
f"Profiler workflow failed for {type(container).__name__} with error {e}"
|
|
)
|
|
|
|
tables: List[Table] = self.metadata.list_all_entities(Table)
|
|
for table in tables:
|
|
if table.name.root != "users":
|
|
continue
|
|
table = self.metadata.get_latest_table_profile(table.fullyQualifiedName)
|
|
columns = table.columns
|
|
self.assertIsNotNone(table.profile)
|
|
for column in columns:
|
|
self.assertIsNotNone(column.profile)
|
|
|
|
def test_profiler_workflow_w_globale_config(self):
|
|
"""test a simple profiler workflow with a globale profiler configuration"""
|
|
# add metric level settings
|
|
profiler_configuration = ProfilerConfiguration(
|
|
metricConfiguration=[
|
|
MetricConfigurationDefinition(
|
|
dataType=DataType.INT,
|
|
disabled=False,
|
|
metrics=[MetricType.valuesCount, MetricType.distinctCount],
|
|
),
|
|
MetricConfigurationDefinition(
|
|
dataType=DataType.VARCHAR, disabled=True, metrics=None
|
|
),
|
|
]
|
|
)
|
|
|
|
settings = Settings(
|
|
config_type=SettingType.profilerConfiguration,
|
|
config_value=profiler_configuration,
|
|
)
|
|
self.metadata.create_or_update_settings(settings)
|
|
|
|
service_names = []
|
|
|
|
for container in self.container_builder.containers:
|
|
try:
|
|
service_name = type(container).__name__
|
|
service_names.append(service_name)
|
|
config = PROFILER_INGESTION_CONFIG_TEMPLATE.format(
|
|
type=container.connector_type,
|
|
service_config=container.get_config(),
|
|
service_name=service_name,
|
|
)
|
|
profiler_workflow = ProfilerWorkflow.create(
|
|
json.loads(config),
|
|
)
|
|
profiler_workflow.execute()
|
|
profiler_workflow.print_status()
|
|
profiler_workflow.raise_from_status()
|
|
profiler_workflow.stop()
|
|
except Exception as e:
|
|
self.fail(f"Profiler workflow failed for {service_name} with error {e}")
|
|
|
|
tables: List[Table] = self.metadata.list_all_entities(Table)
|
|
for table in tables:
|
|
if table.service.name not in service_names or table.name.root != "users":
|
|
continue
|
|
table = self.metadata.get_latest_table_profile(table.fullyQualifiedName)
|
|
columns = table.columns
|
|
self.assertIsNotNone(table.profile)
|
|
for column in columns:
|
|
if column.dataType == DataType.INT:
|
|
self.assertIsNone(column.profile.mean)
|
|
self.assertIsNotNone(column.profile.valuesCount)
|
|
self.assertIsNotNone(column.profile.distinctCount)
|
|
if column.dataType == DataType.STRING:
|
|
self.assertIsNone(column.profile.mean)
|
|
self.assertIsNone(column.profile.valuesCount)
|
|
self.assertIsNone(column.profile.distinctCount)
|