2024-06-20 16:54:12 +02:00
|
|
|
import pytest
|
|
|
|
from testcontainers.mysql import MySqlContainer
|
|
|
|
|
2024-06-21 15:11:34 +02:00
|
|
|
from _openmetadata_testutils.postgres.conftest import postgres_container, try_bind
|
2024-07-19 12:12:34 +02:00
|
|
|
from metadata.generated.schema.api.services.createDatabaseService import (
|
|
|
|
CreateDatabaseServiceRequest,
|
|
|
|
)
|
|
|
|
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
|
|
|
BasicAuth,
|
|
|
|
)
|
|
|
|
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
|
|
|
|
PostgresConnection,
|
|
|
|
)
|
|
|
|
from metadata.generated.schema.entity.services.databaseService import (
|
|
|
|
DatabaseConnection,
|
|
|
|
DatabaseService,
|
|
|
|
DatabaseServiceType,
|
|
|
|
)
|
2024-06-20 16:54:12 +02:00
|
|
|
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
|
|
|
|
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
|
|
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
|
|
from metadata.workflow.metadata import MetadataWorkflow
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
|
"postgres_container",
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def mysql_container():
|
|
|
|
with try_bind(MySqlContainer("mysql:8"), 3306, 3307) as container:
|
|
|
|
yield container
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def ingest_mysql_service(
|
|
|
|
mysql_container: MySqlContainer, metadata: OpenMetadata, tmp_path_factory
|
|
|
|
):
|
|
|
|
workflow_config = {
|
|
|
|
"source": {
|
|
|
|
"type": "mysql",
|
|
|
|
"serviceName": "integration_test_mysql_"
|
|
|
|
+ tmp_path_factory.mktemp("mysql").name.split("/")[-1],
|
|
|
|
"serviceConnection": {
|
|
|
|
"config": {
|
|
|
|
"type": "Mysql",
|
|
|
|
"username": mysql_container.username,
|
|
|
|
"authType": {
|
|
|
|
"password": mysql_container.password,
|
|
|
|
},
|
|
|
|
"hostPort": "localhost:" + mysql_container.get_exposed_port(3306),
|
|
|
|
"databaseSchema": mysql_container.dbname,
|
|
|
|
}
|
|
|
|
},
|
|
|
|
"sourceConfig": {
|
|
|
|
"config": {
|
|
|
|
"type": "DatabaseMetadata",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
|
|
"workflowConfig": {
|
|
|
|
"loggerLevel": LogLevels.DEBUG.value,
|
2024-07-04 14:54:41 +02:00
|
|
|
"openMetadataServerConfig": metadata.config.model_dump(),
|
2024-06-20 16:54:12 +02:00
|
|
|
},
|
|
|
|
}
|
|
|
|
metadata_ingestion = MetadataWorkflow.create(workflow_config)
|
|
|
|
metadata_ingestion.execute()
|
|
|
|
metadata_ingestion.raise_from_status()
|
|
|
|
metadata_ingestion.stop()
|
|
|
|
db_service: DatabaseService = metadata.get_by_name(
|
|
|
|
DatabaseService, workflow_config["source"]["serviceName"]
|
|
|
|
)
|
|
|
|
db_service.connection.config.authType.password = CustomSecretStr(
|
|
|
|
mysql_container.password
|
|
|
|
)
|
|
|
|
yield db_service
|
|
|
|
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
|
2024-07-19 12:12:34 +02:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def create_service_request(tmp_path_factory, postgres_container):
|
|
|
|
return CreateDatabaseServiceRequest(
|
|
|
|
name="docker_test_" + tmp_path_factory.mktemp("postgres").name,
|
|
|
|
serviceType=DatabaseServiceType.Postgres,
|
|
|
|
connection=DatabaseConnection(
|
|
|
|
config=PostgresConnection(
|
|
|
|
username=postgres_container.username,
|
|
|
|
authType=BasicAuth(password=postgres_container.password),
|
|
|
|
hostPort="localhost:"
|
|
|
|
+ postgres_container.get_exposed_port(postgres_container.port),
|
|
|
|
database="dvdrental",
|
|
|
|
)
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def postgres_service(db_service):
|
|
|
|
return db_service
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
def ingest_postgres_metadata(
|
|
|
|
postgres_service, metadata: OpenMetadata, sink_config, workflow_config, run_workflow
|
|
|
|
):
|
|
|
|
workflow_config = {
|
|
|
|
"source": {
|
|
|
|
"type": postgres_service.connection.config.type.value.lower(),
|
|
|
|
"serviceName": postgres_service.fullyQualifiedName.root,
|
|
|
|
"serviceConnection": postgres_service.connection,
|
|
|
|
"sourceConfig": {"config": {}},
|
|
|
|
},
|
|
|
|
"sink": sink_config,
|
|
|
|
"workflowConfig": workflow_config,
|
|
|
|
}
|
|
|
|
run_workflow(MetadataWorkflow, workflow_config)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def patch_password(postgres_container):
|
|
|
|
def inner(service: DatabaseService):
|
|
|
|
service.connection.config = cast(PostgresConnection, service.connection.config)
|
|
|
|
service.connection.config.authType.password = type(
|
|
|
|
service.connection.config.authType.password
|
|
|
|
)(postgres_container.password)
|
|
|
|
return service
|
|
|
|
|
|
|
|
return inner
|