mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	1. remove json and array from supported data types of unique column test. 2. migrations. 3. tests.
		
			
				
	
	
		
			523 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			523 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import sys
 | 
						|
 | 
						|
import pytest
 | 
						|
from pydantic import BaseModel
 | 
						|
from sqlalchemy import VARBINARY
 | 
						|
from sqlalchemy import Column as SQAColumn
 | 
						|
from sqlalchemy import MetaData
 | 
						|
from sqlalchemy import Table as SQATable
 | 
						|
from sqlalchemy import create_engine
 | 
						|
from sqlalchemy.dialects import postgresql
 | 
						|
from sqlalchemy.engine import Connection, make_url
 | 
						|
from sqlalchemy.sql import sqltypes
 | 
						|
 | 
						|
from _openmetadata_testutils.postgres.conftest import postgres_container
 | 
						|
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
 | 
						|
from metadata.data_quality.api.models import TestCaseDefinition
 | 
						|
from metadata.generated.schema.entity.data.table import Table
 | 
						|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
 | 
						|
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
 | 
						|
    TestSuiteConfigType,
 | 
						|
)
 | 
						|
from metadata.generated.schema.tests.basic import (
 | 
						|
    TestCaseResult,
 | 
						|
    TestCaseStatus,
 | 
						|
    TestResultValue,
 | 
						|
)
 | 
						|
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
 | 
						|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
 | 
						|
from metadata.workflow.data_quality import TestSuiteWorkflow
 | 
						|
 | 
						|
if not sys.version_info >= (3, 9):
 | 
						|
    pytest.skip(
 | 
						|
        "requires python 3.9+ due to incompatibility with testcontainers",
 | 
						|
        allow_module_level=True,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class TestParameters(BaseModel):
 | 
						|
    test_case_defintion: TestCaseDefinition
 | 
						|
    table2_fqn: str
 | 
						|
    expected: TestCaseResult
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        if args:
 | 
						|
            # Map positional arguments to fields
 | 
						|
            field_names = list(self.__annotations__.keys())
 | 
						|
            kwargs.update(dict(zip(field_names, args)))
 | 
						|
 | 
						|
        super().__init__(**kwargs)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize(
 | 
						|
    "parameters",
 | 
						|
    [
 | 
						|
        pytest.param(TestParameters(*t), id=t[0].name)
 | 
						|
        for t in [
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="compare_same_tables",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(
 | 
						|
                            name="keyColumns", value="['customer_id']"
 | 
						|
                        ),
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Success,
 | 
						|
                    failedRows=0,
 | 
						|
                    passedRows=599,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="with_explicit_key_columns",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(
 | 
						|
                            name="keyColumns", value="['customer_id']"
 | 
						|
                        ),
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.changed_customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Failed,
 | 
						|
                    failedRows=321,
 | 
						|
                    passedRows=278,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="resolve_primary_keys",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.changed_customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Failed,
 | 
						|
                    failedRows=321,
 | 
						|
                    passedRows=278,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="with_passing_threshold",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(name="threshold", value="322"),
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.changed_customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Success,
 | 
						|
                    failedRows=321,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="with_failing_threshold",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(name="threshold", value="321"),
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.changed_customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Failed,
 | 
						|
                    failedRows=321,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="with_where_clause",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(
 | 
						|
                            name="where",
 | 
						|
                            value="MOD(customer_id, 2) != 0 AND MOD(customer_id, 13) != 0",
 | 
						|
                        ),
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.changed_customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Success,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="without_first_name",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.customer_without_first_name",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Failed,
 | 
						|
                    testResultValue=[
 | 
						|
                        TestResultValue(name="removedColumns", value="1"),
 | 
						|
                        TestResultValue(name="addedColumns", value="0"),
 | 
						|
                        TestResultValue(name="changedColumns", value="0"),
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="without_first_name_with_extra_column",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(
 | 
						|
                            name="useColumns", value="['last_name', 'email']"
 | 
						|
                        )
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "POSTGRES_SERVICE.dvdrental.public.customer_without_first_name",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Success,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="postgres_vs_mysql_success",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[
 | 
						|
                        TestCaseParameterValue(
 | 
						|
                            name="useColumns",
 | 
						|
                            value=str(
 | 
						|
                                [
 | 
						|
                                    "store_id",
 | 
						|
                                    "first_name",
 | 
						|
                                    "last_name",
 | 
						|
                                    "email",
 | 
						|
                                    "activebool",
 | 
						|
                                    "address_id",
 | 
						|
                                    "active",
 | 
						|
                                    # "create_date", # date types are incomparable for mysql and postgres
 | 
						|
                                    "last_update",
 | 
						|
                                ]
 | 
						|
                            ),
 | 
						|
                        )
 | 
						|
                    ],
 | 
						|
                ),
 | 
						|
                "MYSQL_SERVICE.default.test.customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Success,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
            (
 | 
						|
                TestCaseDefinition(
 | 
						|
                    name="postgres_vs_mysql_failure",
 | 
						|
                    testDefinitionName="tableDiff",
 | 
						|
                    computePassedFailedRowCount=True,
 | 
						|
                    parameterValues=[],
 | 
						|
                ),
 | 
						|
                "MYSQL_SERVICE.default.test.changed_customer",
 | 
						|
                TestCaseResult(
 | 
						|
                    testCaseStatus=TestCaseStatus.Failed,
 | 
						|
                ),
 | 
						|
            ),
 | 
						|
        ]
 | 
						|
    ],
 | 
						|
)
 | 
						|
def test_happy_paths(
 | 
						|
    postgres_service: DatabaseService,
 | 
						|
    prepare_data,
 | 
						|
    ingest_postgres_metadata,
 | 
						|
    ingest_mysql_service,
 | 
						|
    patched_metadata,
 | 
						|
    parameters: TestParameters,
 | 
						|
    sink_config,
 | 
						|
    profiler_config,
 | 
						|
    run_workflow,
 | 
						|
    workflow_config,
 | 
						|
    cleanup_fqns,
 | 
						|
):
 | 
						|
    metadata = patched_metadata
 | 
						|
    table1 = metadata.get_by_name(
 | 
						|
        Table,
 | 
						|
        f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
 | 
						|
        nullable=False,
 | 
						|
    )
 | 
						|
    cleanup_fqns(
 | 
						|
        TestCase,
 | 
						|
        f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
 | 
						|
    )
 | 
						|
    table2_service = {
 | 
						|
        "POSTGRES_SERVICE": postgres_service,
 | 
						|
        "MYSQL_SERVICE": ingest_mysql_service,
 | 
						|
    }
 | 
						|
    for k, v in table2_service.items():
 | 
						|
        parameters.table2_fqn = parameters.table2_fqn.replace(
 | 
						|
            k, v.fullyQualifiedName.root
 | 
						|
        )
 | 
						|
    parameters.test_case_defintion.parameterValues.extend(
 | 
						|
        [
 | 
						|
            TestCaseParameterValue(
 | 
						|
                name="table2",
 | 
						|
                value=parameters.table2_fqn,
 | 
						|
            ),
 | 
						|
        ]
 | 
						|
    )
 | 
						|
    workflow_config = {
 | 
						|
        "source": {
 | 
						|
            "type": TestSuiteConfigType.TestSuite.value,
 | 
						|
            "serviceName": "MyTestSuite",
 | 
						|
            "sourceConfig": {
 | 
						|
                "config": {
 | 
						|
                    "type": TestSuiteConfigType.TestSuite.value,
 | 
						|
                    "entityFullyQualifiedName": table1.fullyQualifiedName.root,
 | 
						|
                }
 | 
						|
            },
 | 
						|
        },
 | 
						|
        "processor": {
 | 
						|
            "type": "orm-test-runner",
 | 
						|
            "config": {"testCases": [parameters.test_case_defintion.dict()]},
 | 
						|
        },
 | 
						|
        "sink": sink_config,
 | 
						|
        "workflowConfig": workflow_config,
 | 
						|
    }
 | 
						|
    run_workflow(TestSuiteWorkflow, workflow_config)
 | 
						|
    test_case_entity = metadata.get_by_name(
 | 
						|
        TestCase,
 | 
						|
        f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
 | 
						|
        fields=["*"],
 | 
						|
    )
 | 
						|
    assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result
 | 
						|
    assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize(
 | 
						|
    "parameters,expected",
 | 
						|
    [
 | 
						|
        pytest.param(
 | 
						|
            TestCaseDefinition(
 | 
						|
                name="unsupported_dialect",
 | 
						|
                testDefinitionName="tableDiff",
 | 
						|
                computePassedFailedRowCount=True,
 | 
						|
                parameterValues=[
 | 
						|
                    TestCaseParameterValue(
 | 
						|
                        name="service2Url",
 | 
						|
                        value="mongodb://localhost:27017",
 | 
						|
                    ),
 | 
						|
                    TestCaseParameterValue(
 | 
						|
                        name="table2",
 | 
						|
                        value="POSTGRES_SERVICE.dvdrental.public.customer",
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            TestCaseResult(
 | 
						|
                testCaseStatus=TestCaseStatus.Aborted,
 | 
						|
                result="Unsupported dialect in param table2.serviceUrl: mongodb",
 | 
						|
            ),
 | 
						|
            id="unsupported_dialect",
 | 
						|
        ),
 | 
						|
        pytest.param(
 | 
						|
            TestCaseDefinition(
 | 
						|
                name="unsupported_data_types",
 | 
						|
                testDefinitionName="tableDiff",
 | 
						|
                computePassedFailedRowCount=True,
 | 
						|
                parameterValues=[
 | 
						|
                    TestCaseParameterValue(
 | 
						|
                        name="table2",
 | 
						|
                        value="POSTGRES_SERVICE.dvdrental.public.customer_int_first_name",
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            TestCaseResult(
 | 
						|
                testCaseStatus=TestCaseStatus.Failed,
 | 
						|
                result="Tables have 1 different columns:"
 | 
						|
                "\n  Changed columns:"
 | 
						|
                "\n    first_name: VARCHAR -> INT",
 | 
						|
                testResultValue=[
 | 
						|
                    TestResultValue(name="removedColumns", value="0"),
 | 
						|
                    TestResultValue(name="addedColumns", value="0"),
 | 
						|
                    TestResultValue(name="changedColumns", value="1"),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
        pytest.param(
 | 
						|
            None,
 | 
						|
            None,
 | 
						|
            marks=pytest.mark.skip(
 | 
						|
                reason="TODO: implement test - table2 does not exist"
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
        pytest.param(
 | 
						|
            None,
 | 
						|
            None,
 | 
						|
            marks=pytest.mark.skip(
 | 
						|
                reason="TODO: implement test - where clause is invalid"
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
    ],
 | 
						|
)
 | 
						|
def test_error_paths(
 | 
						|
    parameters: TestCaseDefinition,
 | 
						|
    expected: TestCaseResult,
 | 
						|
    prepare_data: None,
 | 
						|
    ingest_postgres_metadata,
 | 
						|
    ingest_mysql_service: DatabaseService,
 | 
						|
    postgres_service: DatabaseService,
 | 
						|
    patched_metadata: OpenMetadata,
 | 
						|
    sink_config,
 | 
						|
    workflow_config,
 | 
						|
    run_workflow,
 | 
						|
    cleanup_fqns,
 | 
						|
):
 | 
						|
    metadata = patched_metadata
 | 
						|
    table1 = metadata.get_by_name(
 | 
						|
        Table,
 | 
						|
        f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
 | 
						|
        nullable=False,
 | 
						|
    )
 | 
						|
    cleanup_fqns(TestCase, f"{table1.fullyQualifiedName.root}.{parameters.name}")
 | 
						|
    for parameter in parameters.parameterValues:
 | 
						|
        if parameter.name == "table2":
 | 
						|
            parameter.value = parameter.value.replace(
 | 
						|
                "POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
 | 
						|
            )
 | 
						|
    workflow_config = {
 | 
						|
        "source": {
 | 
						|
            "type": TestSuiteConfigType.TestSuite.value,
 | 
						|
            "serviceName": "MyTestSuite",
 | 
						|
            "sourceConfig": {
 | 
						|
                "config": {
 | 
						|
                    "type": TestSuiteConfigType.TestSuite.value,
 | 
						|
                    "entityFullyQualifiedName": table1.fullyQualifiedName.root,
 | 
						|
                }
 | 
						|
            },
 | 
						|
        },
 | 
						|
        "processor": {
 | 
						|
            "type": "orm-test-runner",
 | 
						|
            "config": {"testCases": [parameters.dict()]},
 | 
						|
        },
 | 
						|
        "sink": sink_config,
 | 
						|
        "workflowConfig": workflow_config,
 | 
						|
    }
 | 
						|
    run_workflow(TestSuiteWorkflow, workflow_config)
 | 
						|
    test_case_entity: TestCase = metadata.get_or_create_test_case(
 | 
						|
        f"{table1.fullyQualifiedName.root}.{parameters.name}"
 | 
						|
    )
 | 
						|
    assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult)
 | 
						|
 | 
						|
 | 
						|
def add_changed_tables(connection: Connection):
 | 
						|
    connection.execute("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;")
 | 
						|
    connection.execute("CREATE TABLE changed_customer AS SELECT * FROM customer;")
 | 
						|
    connection.execute(
 | 
						|
        "UPDATE changed_customer SET first_name = 'John' WHERE MOD(customer_id, 2) = 0;"
 | 
						|
    )
 | 
						|
    connection.execute("DELETE FROM changed_customer WHERE MOD(customer_id, 13) = 0;")
 | 
						|
    connection.execute(
 | 
						|
        "CREATE TABLE customer_without_first_name AS SELECT * FROM customer;"
 | 
						|
    )
 | 
						|
    connection.execute(
 | 
						|
        "ALTER TABLE customer_without_first_name DROP COLUMN first_name;"
 | 
						|
    )
 | 
						|
    connection.execute(
 | 
						|
        "CREATE TABLE customer_int_first_name AS SELECT * FROM customer;"
 | 
						|
    )
 | 
						|
    connection.execute("ALTER TABLE customer_int_first_name DROP COLUMN first_name;")
 | 
						|
    connection.execute("ALTER TABLE customer_int_first_name ADD COLUMN first_name INT;")
 | 
						|
    connection.execute("UPDATE customer_int_first_name SET first_name = 1;")
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="module")
 | 
						|
def prepare_data(postgres_container, mysql_container):
 | 
						|
    postgres_engine = create_engine(
 | 
						|
        make_url(postgres_container.get_connection_url()).set(database="dvdrental")
 | 
						|
    )
 | 
						|
    with postgres_engine.connect() as conn:
 | 
						|
        add_changed_tables(conn)
 | 
						|
    mysql_container = create_engine(
 | 
						|
        make_url(mysql_container.get_connection_url()).set(
 | 
						|
            database=mysql_container.dbname
 | 
						|
        )
 | 
						|
    )
 | 
						|
    postgres_engine = create_engine(
 | 
						|
        make_url(postgres_container.get_connection_url()).set(database="dvdrental")
 | 
						|
    )
 | 
						|
    copy_table(postgres_engine, mysql_container, "customer")
 | 
						|
    copy_table(postgres_engine, mysql_container, "changed_customer")
 | 
						|
 | 
						|
 | 
						|
def copy_table(source_engine, destination_engine, table_name):
 | 
						|
    source_metadata = MetaData()
 | 
						|
    source_table = SQATable(table_name, source_metadata, autoload_with=source_engine)
 | 
						|
    destination_metadata = MetaData()
 | 
						|
    destination_table = SQATable(table_name, destination_metadata)
 | 
						|
 | 
						|
    for column in source_table.columns:
 | 
						|
        # we copy all the columns without constraints, indexes or defaults
 | 
						|
        # as we are only interested in the data
 | 
						|
        if (
 | 
						|
            isinstance(column.type, postgresql.base.BYTEA)
 | 
						|
            and destination_engine.dialect.name == "mssql"
 | 
						|
        ):
 | 
						|
            column_copy = SQAColumn(column.name, VARBINARY)
 | 
						|
        elif (
 | 
						|
            isinstance(column.type, sqltypes.BOOLEAN)
 | 
						|
            and destination_engine.dialect.name == "mssql"
 | 
						|
        ):
 | 
						|
            column_copy = SQAColumn(column.name, sqltypes.Boolean)
 | 
						|
        elif (
 | 
						|
            isinstance(column.type, sqltypes.TIMESTAMP)
 | 
						|
            and destination_engine.dialect.name == "mssql"
 | 
						|
        ):
 | 
						|
            column_copy = SQAColumn(column.name, sqltypes.DateTime)
 | 
						|
        elif (
 | 
						|
            isinstance(column.type, sqltypes.DATE)
 | 
						|
            and destination_engine.dialect.name == "mssql"
 | 
						|
        ):
 | 
						|
            column_copy = SQAColumn(column.name, sqltypes.DateTime)
 | 
						|
        elif isinstance(column.type, postgresql.json.JSONB):
 | 
						|
            column_copy = SQAColumn(column.name, sqltypes.JSON)
 | 
						|
        else:
 | 
						|
            column_copy = SQAColumn(column.name, column.type)
 | 
						|
        destination_table.append_column(column_copy)
 | 
						|
    destination_metadata.create_all(destination_engine)
 | 
						|
    with source_engine.connect() as source_connection, destination_engine.connect() as destination_connection:
 | 
						|
        data = source_connection.execute(source_table.select()).fetchall()
 | 
						|
        batch_size = 1000
 | 
						|
        for i in range(0, len(data), batch_size):
 | 
						|
            batch = data[i : i + batch_size]
 | 
						|
            destination_connection.execute(
 | 
						|
                source_table.insert(), [dict(row) for row in batch]
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch):
 | 
						|
    dbs_by_name = {
 | 
						|
        service.fullyQualifiedName.root: service
 | 
						|
        for service in [postgres_service, ingest_mysql_service]
 | 
						|
    }
 | 
						|
 | 
						|
    def override_result_by_fqn(func):
 | 
						|
        def inner(*args, **kwargs):
 | 
						|
            result = func(*args, **kwargs)
 | 
						|
            if result.fullyQualifiedName.root in dbs_by_name:
 | 
						|
                return dbs_by_name[result.fullyQualifiedName.root]
 | 
						|
            return result
 | 
						|
 | 
						|
        return inner
 | 
						|
 | 
						|
    monkeypatch.setattr(
 | 
						|
        "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
 | 
						|
        override_result_by_fqn(OpenMetadata.get_by_name),
 | 
						|
    )
 | 
						|
 | 
						|
    monkeypatch.setattr(
 | 
						|
        "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
 | 
						|
        override_result_by_fqn(OpenMetadata.get_by_id),
 | 
						|
    )
 | 
						|
 | 
						|
    return metadata
 |