OpenMetadata/ingestion/tests/integration/automations/test_connection_automation.py
IceS2 891ff4184d
MINOR: Initial implementation for our Connection Class (#21581)
* Initial implementation for our Connection Class

* Implement the Initial Connection class

* Add Unit Tests

* Fix Test

* Fix Profile Test Connection

* Remove unit test

* Remove comment

* Fix tests and missing changes
2025-06-13 14:52:29 +02:00

137 lines
4.7 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata high-level API Workflow test
"""
import sys
import pytest
from metadata.generated.schema.api.automations.createWorkflow import (
CreateWorkflowRequest,
)
from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
)
from metadata.generated.schema.entity.automations.workflow import (
Workflow,
WorkflowStatus,
WorkflowType,
)
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
MySQLType,
)
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
StatusType,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.ingestion.source.connections import get_test_connection_fn
if sys.version_info < (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
def test_connection_workflow(metadata, mysql_container):
"""
Test all the steps related to the test connection automation workflow
"""
service_connection = MysqlConnection(
username=mysql_container.username,
authType=BasicAuth(password=mysql_container.password),
hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}",
)
new_workflow_request = CreateWorkflowRequest(
name="test-connection-mysql",
description="description",
workflowType=WorkflowType.TEST_CONNECTION,
request=TestServiceConnectionRequest(
serviceType=ServiceType.Database,
connectionType=MySQLType.Mysql.value,
connection=DatabaseConnection(
config=service_connection,
),
),
)
automation_workflow: Workflow = metadata.create_or_update(data=new_workflow_request)
test_connection_fn = get_test_connection_fn(service_connection)
test_connection_fn(metadata, automation_workflow=automation_workflow)
final_workflow: Workflow = metadata.get_by_name(
entity=Workflow, fqn="test-connection-mysql"
)
assert final_workflow.status == WorkflowStatus.Successful
assert len(final_workflow.response.steps) == 5
# Get queries is not passing since we're not enabling the logs in the container
assert final_workflow.response.status.value == StatusType.Failed.value
steps = [
step for step in final_workflow.response.steps if step.name != "GetQueries"
]
assert all(step.passed for step in steps)
metadata.delete(
entity=Workflow,
entity_id=str(automation_workflow.id.root),
hard_delete=True,
)
def test_connection_workflow_ko(metadata):
"""Test connection that will fail"""
wrong_service_connection = MysqlConnection(
username="openmetadata_user",
authType=BasicAuth(password="openmetadata_password"),
hostPort="localhost:8585", # There's something running there, but it's not MySQL
databaseSchema="openmetadata_db",
)
wrong_workflow_request = CreateWorkflowRequest(
name="test-connection-mysql-bad",
description="description",
workflowType=WorkflowType.TEST_CONNECTION,
request=TestServiceConnectionRequest(
serviceType=ServiceType.Database,
connectionType=MySQLType.Mysql.value,
connection=DatabaseConnection(
config=wrong_service_connection,
),
),
)
automation_workflow: Workflow = metadata.create_or_update(
data=wrong_workflow_request
)
test_connection_fn = get_test_connection_fn(wrong_service_connection)
test_connection_fn(metadata, automation_workflow=automation_workflow)
final_workflow: Workflow = metadata.get_by_name(
entity=Workflow, fqn="test-connection-mysql-bad"
)
assert final_workflow.response.status == StatusType.Failed
metadata.delete(
entity=Workflow,
entity_id=str(automation_workflow.id.root),
hard_delete=True,
)