Pere Miquel Brull 3186937cc2
MINOR - Update Auto Classification defaults for sample data & classif… (#20587)
* MINOR - Update Auto Classification defaults for sample data & classification

* fix tests
2025-04-07 15:56:57 +02:00

377 lines
14 KiB
Python

# Copyright 2024 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test the NoSQL profiler using a MongoDB container
To run this we need OpenMetadata server up and running.
No sample data is required beforehand
Test Steps:
1. Start a MongoDB container
2. Ingest data into OpenMetadata
3. Run the profiler workflow
4. Verify the profiler output
5. Tear down the MongoDB container and delete the service from OpenMetadata
"""
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
from random import choice, randint
from unittest import TestCase
from pymongo import MongoClient, database
from testcontainers.mongodb import MongoDbContainer
from _openmetadata_testutils.ometa import int_admin_ometa
from metadata.generated.schema.entity.data.table import ColumnProfile, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import TableConfig
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.test_utils import accumulate_errors
from metadata.utils.time_utils import get_end_of_day_timestamp_mill
from metadata.workflow.classification import AutoClassificationWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import WorkflowResultStatus
SERVICE_NAME = Path(__file__).stem
def add_query_config(config, table_config: TableConfig) -> dict:
config_copy = deepcopy(config)
config_copy["processor"]["config"].setdefault("tableConfig", [])
config_copy["processor"]["config"]["tableConfig"].append(table_config)
return config_copy
def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str):
return {
"source": {
"type": "mongodb",
"serviceName": SERVICE_NAME,
"serviceConnection": {
"config": {
"type": "MongoDB",
"hostPort": f"localhost:{mongo_port}",
"username": mongo_user,
"password": mongo_pass,
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
TEST_DATABASE = "test-database"
EMPTY_COLLECTION = "empty-collection"
TEST_COLLECTION = "test-collection"
NUM_ROWS = 200
def random_row():
return {
"name": choice(["John", "Jane", "Alice", "Bob"]),
"age": randint(20, 60),
"city": choice(["New York", "Chicago", "San Francisco"]),
"nested": {"key": "value" + str(randint(1, 10))},
}
TEST_DATA = [random_row() for _ in range(NUM_ROWS)] + [
{
"name": "John",
"age": 60,
"city": "New York",
},
{
"name": "Jane",
"age": 20,
"city": "New York",
},
]
class NoSQLProfiler(TestCase):
"""datalake profiler E2E test"""
mongo_container: MongoDbContainer
client: MongoClient
db: database.Database
collection: database.Collection
ingestion_config: dict
metadata: OpenMetadata
@classmethod
def setUpClass(cls) -> None:
cls.metadata = int_admin_ometa()
cls.mongo_container = MongoDbContainer("mongo:7.0.5-jammy")
cls.mongo_container.start()
cls.client = MongoClient(cls.mongo_container.get_connection_url())
cls.db = cls.client[TEST_DATABASE]
cls.collection = cls.db[TEST_COLLECTION]
cls.collection.insert_many(TEST_DATA)
cls.db.create_collection(EMPTY_COLLECTION)
cls.ingestion_config = get_ingestion_config(
cls.mongo_container.get_exposed_port("27017"), "test", "test"
)
# cls.client["admin"].command("grantRolesToUser", "test", roles=["userAdminAnyDatabase"])
ingestion_workflow = MetadataWorkflow.create(
cls.ingestion_config,
)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.print_status()
ingestion_workflow.stop()
@classmethod
def tearDownClass(cls):
with accumulate_errors() as error_handler:
error_handler.try_execute(partial(cls.mongo_container.stop, force=True))
error_handler.try_execute(cls.delete_service)
@classmethod
def delete_service(cls):
service_id = str(
cls.metadata.get_by_name(entity=DatabaseService, fqn=SERVICE_NAME).id.root
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
def test_setup_teardown(self):
"""
does nothing. useful to check if the setup and teardown methods are working
"""
pass
def run_profiler_workflow(self, config):
profiler_workflow = ProfilerWorkflow.create(config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == WorkflowResultStatus.SUCCESS
def run_auto_classification_workflow(self, config):
auto_classification_workflow = AutoClassificationWorkflow.create(config)
auto_classification_workflow.execute()
status = auto_classification_workflow.result_status()
auto_classification_workflow.stop()
assert status == WorkflowResultStatus.SUCCESS
def test_simple(self):
workflow_config = deepcopy(self.ingestion_config)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
self.run_profiler_workflow(workflow_config)
cases = [
{
"collection": EMPTY_COLLECTION,
"expected": {
"rowCount": 0,
"columns": [],
},
},
{
"collection": TEST_COLLECTION,
"expected": {
"rowCount": len(TEST_DATA),
"columns": [
ColumnProfile(
name="age",
timestamp=Timestamp(int(datetime.now().timestamp())),
max=60,
min=20,
),
],
},
},
]
for tc in cases:
collection = tc["collection"]
expected = tc["expected"]
collection_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
)
assert collection_profile.entities
assert collection_profile.entities[-1].rowCount == expected["rowCount"]
column_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}.age",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
)
assert (len(column_profile.entities) > 0) == (
len(tc["expected"]["columns"]) > 0
)
if len(expected["columns"]) > 0:
for c1, c2 in zip(column_profile.entities, expected["columns"]):
assert c1.name == c2.name
assert c1.max == c2.max
assert c1.min == c2.min
auto_workflow_config = deepcopy(self.ingestion_config)
auto_workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "AutoClassification",
"storeSampleData": True,
"enableAutoClassification": False,
}
)
auto_workflow_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
self.run_auto_classification_workflow(auto_workflow_config)
table = self.metadata.get_by_name(
Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}"
)
sample_data = self.metadata.get_sample_data(table)
assert [c.root for c in sample_data.sampleData.columns] == [
"_id",
"name",
"age",
"city",
"nested",
]
assert len(sample_data.sampleData.rows) == SAMPLE_DATA_DEFAULT_COUNT
def test_custom_query(self):
workflow_config = deepcopy(self.ingestion_config)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
query_age = TEST_DATA[0]["age"]
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {
"tableConfig": [
{
"fullyQualifiedName": f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}",
"profileQuery": '{"age": %s}' % query_age,
}
],
},
}
self.run_profiler_workflow(workflow_config)
cases = [
{
"collection": EMPTY_COLLECTION,
"expected": {
"rowCount": 0,
"columns": [],
},
},
{
"collection": TEST_COLLECTION,
"expected": {
"rowCount": len(TEST_DATA),
"columns": [
ColumnProfile(
name="age",
timestamp=Timestamp(int(datetime.now().timestamp())),
max=query_age,
min=query_age,
),
],
},
},
]
for tc in cases:
collection = tc["collection"]
expected_row_count = tc["expected"]["rowCount"]
collection_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
)
assert collection_profile.entities, collection
assert (
collection_profile.entities[-1].rowCount == expected_row_count
), collection
column_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}.age",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
)
assert (len(column_profile.entities) > 0) == (
len(tc["expected"]["columns"]) > 0
)
auto_workflow_config = deepcopy(self.ingestion_config)
auto_workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "AutoClassification",
"storeSampleData": True,
"enableAutoClassification": False,
}
)
auto_workflow_config["processor"] = {
"type": "orm-profiler",
"config": {
"tableConfig": [
{
"fullyQualifiedName": f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}",
"profileQuery": '{"age": %s}' % query_age,
}
],
},
}
self.run_auto_classification_workflow(auto_workflow_config)
table = self.metadata.get_by_name(
Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}"
)
sample_data = self.metadata.get_sample_data(table)
age_column_index = [col.root for col in sample_data.sampleData.columns].index(
"age"
)
assert all(
[r[age_column_index] == str(query_age) for r in sample_data.sampleData.rows]
)