mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-05 20:17:07 +00:00
FIX - Postgres Python CI (#17859)
* fix deprecation * remove print * tests use testcontainers for mysql * testcontainers for 3.8 dont work
This commit is contained in:
parent
d5507ca44f
commit
91ddfcf07a
@ -141,7 +141,7 @@ from metadata.utils.constants import UTF_8
|
||||
from metadata.utils.fqn import FQN_SEPARATOR
|
||||
from metadata.utils.helpers import get_standard_chart_type
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
|
||||
from metadata.utils.time_utils import datetime_to_timestamp
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
@ -1646,11 +1646,10 @@ class SampleDataSource(
|
||||
life_cycle_data.created = AccessDetails(
|
||||
timestamp=Timestamp(
|
||||
int(
|
||||
convert_timestamp_to_milliseconds(
|
||||
(
|
||||
datetime.now()
|
||||
- timedelta(days=life_cycle["created"]["days"])
|
||||
).timestamp()
|
||||
datetime_to_timestamp(
|
||||
datetime_value=datetime.now()
|
||||
- timedelta(days=life_cycle["created"]["days"]),
|
||||
milliseconds=True,
|
||||
)
|
||||
)
|
||||
),
|
||||
@ -1660,11 +1659,10 @@ class SampleDataSource(
|
||||
life_cycle_data.updated = AccessDetails(
|
||||
timestamp=Timestamp(
|
||||
int(
|
||||
convert_timestamp_to_milliseconds(
|
||||
(
|
||||
datetime.now()
|
||||
- timedelta(days=life_cycle["updated"]["days"])
|
||||
).timestamp()
|
||||
datetime_to_timestamp(
|
||||
datetime_value=datetime.now()
|
||||
- timedelta(days=life_cycle["updated"]["days"]),
|
||||
milliseconds=True,
|
||||
)
|
||||
),
|
||||
),
|
||||
@ -1674,11 +1672,10 @@ class SampleDataSource(
|
||||
life_cycle_data.accessed = AccessDetails(
|
||||
timestamp=Timestamp(
|
||||
int(
|
||||
convert_timestamp_to_milliseconds(
|
||||
(
|
||||
datetime.now()
|
||||
- timedelta(days=life_cycle["accessed"]["days"])
|
||||
).timestamp()
|
||||
datetime_to_timestamp(
|
||||
datetime_value=datetime.now()
|
||||
- timedelta(days=life_cycle["accessed"]["days"]),
|
||||
milliseconds=True,
|
||||
)
|
||||
),
|
||||
),
|
||||
|
||||
25
ingestion/tests/integration/automations/conftest.py
Normal file
25
ingestion/tests/integration/automations/conftest.py
Normal file
@ -0,0 +1,25 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Automations integration tests"""
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from ..containers import MySqlContainerConfigs, get_mysql_container
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mysql_container():
|
||||
with get_mysql_container(
|
||||
MySqlContainerConfigs(container_name=str(uuid.uuid4()))
|
||||
) as container:
|
||||
yield container
|
||||
@ -12,8 +12,9 @@
|
||||
"""
|
||||
OpenMetadata high-level API Workflow test
|
||||
"""
|
||||
from unittest import TestCase
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from metadata.generated.schema.api.automations.createWorkflow import (
|
||||
@ -34,129 +35,100 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect
|
||||
MysqlConnection,
|
||||
MySQLType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
|
||||
StatusType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
|
||||
from metadata.generated.schema.entity.services.serviceType import ServiceType
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
|
||||
|
||||
SERVICE_CONNECTION = MysqlConnection(
|
||||
username="openmetadata_user",
|
||||
authType=BasicAuth(password="openmetadata_password"),
|
||||
hostPort="localhost:3306",
|
||||
)
|
||||
if sys.version_info < (3, 9):
|
||||
pytest.skip("requires python 3.9+", allow_module_level=True)
|
||||
|
||||
|
||||
class TestConnectionAutomationTest(TestCase):
|
||||
def test_connection_workflow(metadata, mysql_container):
|
||||
"""
|
||||
Run this integration test with the local API available
|
||||
Install the ingestion package before running the tests
|
||||
Test all the steps related to the test connection automation workflow
|
||||
"""
|
||||
|
||||
service_entity_id = None
|
||||
service_connection = MysqlConnection(
|
||||
username=mysql_container.username,
|
||||
authType=BasicAuth(password=mysql_container.password),
|
||||
hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}",
|
||||
)
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(
|
||||
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
new_workflow_request = CreateWorkflowRequest(
|
||||
name="test-connection-mysql",
|
||||
description="description",
|
||||
workflowType=WorkflowType.TEST_CONNECTION,
|
||||
request=TestServiceConnectionRequest(
|
||||
serviceType=ServiceType.Database,
|
||||
connectionType=MySQLType.Mysql.value,
|
||||
connection=DatabaseConnection(
|
||||
config=service_connection,
|
||||
),
|
||||
),
|
||||
)
|
||||
metadata = OpenMetadata(server_config)
|
||||
|
||||
assert metadata.health_check()
|
||||
automation_workflow: Workflow = metadata.create_or_update(data=new_workflow_request)
|
||||
engine: Engine = get_connection(service_connection)
|
||||
|
||||
def test_connection_workflow(self):
|
||||
"""
|
||||
Test all the steps related to the test connection automation workflow
|
||||
"""
|
||||
new_workflow_request = CreateWorkflowRequest(
|
||||
name="test-connection-mysql",
|
||||
description="description",
|
||||
workflowType=WorkflowType.TEST_CONNECTION,
|
||||
request=TestServiceConnectionRequest(
|
||||
serviceType=ServiceType.Database,
|
||||
connectionType=MySQLType.Mysql.value,
|
||||
connection=DatabaseConnection(
|
||||
config=SERVICE_CONNECTION,
|
||||
),
|
||||
test_connection_fn = get_test_connection_fn(service_connection)
|
||||
test_connection_fn(metadata, engine, service_connection, automation_workflow)
|
||||
|
||||
final_workflow: Workflow = metadata.get_by_name(
|
||||
entity=Workflow, fqn="test-connection-mysql"
|
||||
)
|
||||
|
||||
assert final_workflow.status == WorkflowStatus.Successful
|
||||
assert len(final_workflow.response.steps) == 4
|
||||
assert final_workflow.response.status.value == StatusType.Successful.value
|
||||
|
||||
metadata.delete(
|
||||
entity=Workflow,
|
||||
entity_id=str(automation_workflow.id.root),
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
|
||||
def test_connection_workflow_ko(metadata):
|
||||
"""Test connection that will fail"""
|
||||
wrong_service_connection = MysqlConnection(
|
||||
username="openmetadata_user",
|
||||
authType=BasicAuth(password="openmetadata_password"),
|
||||
hostPort="localhost:8585", # There's something running there, but it's not MySQL
|
||||
databaseSchema="openmetadata_db",
|
||||
)
|
||||
|
||||
wrong_workflow_request = CreateWorkflowRequest(
|
||||
name="test-connection-mysql-bad",
|
||||
description="description",
|
||||
workflowType=WorkflowType.TEST_CONNECTION,
|
||||
request=TestServiceConnectionRequest(
|
||||
serviceType=ServiceType.Database,
|
||||
connectionType=MySQLType.Mysql.value,
|
||||
connection=DatabaseConnection(
|
||||
config=wrong_service_connection,
|
||||
),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
automation_workflow: Workflow = self.metadata.create_or_update(
|
||||
data=new_workflow_request
|
||||
)
|
||||
engine: Engine = get_connection(SERVICE_CONNECTION)
|
||||
automation_workflow: Workflow = metadata.create_or_update(
|
||||
data=wrong_workflow_request
|
||||
)
|
||||
engine: Engine = get_connection(wrong_service_connection)
|
||||
|
||||
test_connection_fn = get_test_connection_fn(SERVICE_CONNECTION)
|
||||
test_connection_fn(
|
||||
self.metadata, engine, SERVICE_CONNECTION, automation_workflow
|
||||
)
|
||||
test_connection_fn = get_test_connection_fn(wrong_service_connection)
|
||||
test_connection_fn(metadata, engine, wrong_service_connection, automation_workflow)
|
||||
|
||||
final_workflow: Workflow = self.metadata.get_by_name(
|
||||
entity=Workflow, fqn="test-connection-mysql"
|
||||
)
|
||||
final_workflow: Workflow = metadata.get_by_name(
|
||||
entity=Workflow, fqn="test-connection-mysql-bad"
|
||||
)
|
||||
|
||||
self.assertEqual(final_workflow.status, WorkflowStatus.Successful)
|
||||
self.assertEqual(len(final_workflow.response.steps), 4)
|
||||
self.assertEqual(
|
||||
final_workflow.response.status.value, StatusType.Successful.value
|
||||
)
|
||||
assert final_workflow.response.status == StatusType.Failed
|
||||
|
||||
self.metadata.delete(
|
||||
entity=Workflow,
|
||||
entity_id=str(automation_workflow.id.root),
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
def test_connection_workflow_ko(self):
|
||||
"""Test connection that will fail"""
|
||||
wrong_service_connection = MysqlConnection(
|
||||
username="openmetadata_user",
|
||||
authType=BasicAuth(password="openmetadata_password"),
|
||||
hostPort="localhost:8585", # There's something running there, but it's not MySQL
|
||||
databaseSchema="openmetadata_db",
|
||||
)
|
||||
|
||||
wrong_workflow_request = CreateWorkflowRequest(
|
||||
name="test-connection-mysql-bad",
|
||||
description="description",
|
||||
workflowType=WorkflowType.TEST_CONNECTION,
|
||||
request=TestServiceConnectionRequest(
|
||||
serviceType=ServiceType.Database,
|
||||
connectionType=MySQLType.Mysql.value,
|
||||
connection=DatabaseConnection(
|
||||
config=wrong_service_connection,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
automation_workflow: Workflow = self.metadata.create_or_update(
|
||||
data=wrong_workflow_request
|
||||
)
|
||||
engine: Engine = get_connection(wrong_service_connection)
|
||||
|
||||
test_connection_fn = get_test_connection_fn(wrong_service_connection)
|
||||
test_connection_fn(
|
||||
self.metadata, engine, wrong_service_connection, automation_workflow
|
||||
)
|
||||
|
||||
final_workflow: Workflow = self.metadata.get_by_name(
|
||||
entity=Workflow, fqn="test-connection-mysql-bad"
|
||||
)
|
||||
|
||||
self.assertEqual(final_workflow.response.status, StatusType.Failed)
|
||||
|
||||
self.metadata.delete(
|
||||
entity=Workflow,
|
||||
entity_id=str(automation_workflow.id.root),
|
||||
hard_delete=True,
|
||||
)
|
||||
metadata.delete(
|
||||
entity=Workflow,
|
||||
entity_id=str(automation_workflow.id.root),
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
25
ingestion/tests/integration/connections/conftest.py
Normal file
25
ingestion/tests/integration/connections/conftest.py
Normal file
@ -0,0 +1,25 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Connections integration tests"""
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from ..containers import MySqlContainerConfigs, get_mysql_container
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mysql_container():
|
||||
with get_mysql_container(
|
||||
MySqlContainerConfigs(container_name=str(uuid.uuid4()))
|
||||
) as container:
|
||||
yield container
|
||||
@ -8,12 +8,12 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
MySQL connection test
|
||||
"""
|
||||
from unittest import TestCase
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
||||
@ -22,51 +22,27 @@ from metadata.generated.schema.entity.services.connections.database.common.basic
|
||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||
MysqlConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
pytest.skip("requires python 3.9+", allow_module_level=True)
|
||||
|
||||
class MySQLConnectionTest(TestCase):
|
||||
|
||||
def test_test_connection(metadata, mysql_container):
|
||||
"""
|
||||
Validate MySQL connections
|
||||
Test connection function requires:
|
||||
- ometa
|
||||
- connection object, i.e., the engine
|
||||
- the service connection
|
||||
"""
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(
|
||||
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
),
|
||||
)
|
||||
metadata = OpenMetadata(server_config)
|
||||
|
||||
assert metadata.health_check()
|
||||
|
||||
service_connection = MysqlConnection(
|
||||
username="openmetadata_user",
|
||||
authType=BasicAuth(password="openmetadata_password"),
|
||||
hostPort="localhost:3306",
|
||||
databaseSchema="openmetadata_db",
|
||||
username=mysql_container.username,
|
||||
authType=BasicAuth(password=mysql_container.password),
|
||||
hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}",
|
||||
)
|
||||
|
||||
def test_get_connection(self):
|
||||
engine = get_connection(self.service_connection)
|
||||
self.assertTrue(isinstance(engine, Engine))
|
||||
engine = get_connection(service_connection)
|
||||
assert isinstance(engine, Engine)
|
||||
|
||||
def test_test_connection(self):
|
||||
"""
|
||||
Test connection function requires:
|
||||
- ometa
|
||||
- connection object, i.e., the engine
|
||||
- the service connection
|
||||
"""
|
||||
engine = get_connection(self.service_connection)
|
||||
|
||||
_test_connection_fn = get_test_connection_fn(self.service_connection)
|
||||
_test_connection_fn(self.metadata, engine, self.service_connection)
|
||||
_test_connection_fn = get_test_connection_fn(service_connection)
|
||||
_test_connection_fn(metadata, engine, service_connection)
|
||||
|
||||
80
ingestion/tests/integration/ometa/conftest.py
Normal file
80
ingestion/tests/integration/ometa/conftest.py
Normal file
@ -0,0 +1,80 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Automations integration tests"""
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
||||
BasicAuth,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||
MysqlConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.workflow.metadata import MetadataWorkflow
|
||||
|
||||
from ..containers import MySqlContainerConfigs, get_mysql_container
|
||||
from ..integration_base import (
|
||||
METADATA_INGESTION_CONFIG_TEMPLATE,
|
||||
generate_name,
|
||||
get_create_service,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mysql_container():
|
||||
with get_mysql_container(
|
||||
MySqlContainerConfigs(container_name=str(uuid.uuid4()))
|
||||
) as container:
|
||||
yield container
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def service(metadata):
|
||||
service_name = generate_name()
|
||||
create_service = get_create_service(entity=DatabaseService, name=service_name)
|
||||
yield metadata.create_or_update(data=create_service)
|
||||
|
||||
service_id = str(
|
||||
metadata.get_by_name(entity=DatabaseService, fqn=service_name).id.root
|
||||
)
|
||||
|
||||
metadata.delete(
|
||||
entity=DatabaseService,
|
||||
entity_id=service_id,
|
||||
recursive=True,
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def workflow(metadata, service, mysql_container):
|
||||
service_name = service.name.root
|
||||
|
||||
workflow_config = json.loads(
|
||||
METADATA_INGESTION_CONFIG_TEMPLATE.format(
|
||||
type="mysql",
|
||||
service_name=service_name,
|
||||
service_config=MysqlConnection(
|
||||
username=mysql_container.username,
|
||||
authType=BasicAuth(
|
||||
password=mysql_container.password,
|
||||
),
|
||||
hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}",
|
||||
).model_dump_json(),
|
||||
source_config={},
|
||||
)
|
||||
)
|
||||
workflow_config["ingestionPipelineFQN"] = f"{service_name}.ingestion"
|
||||
return MetadataWorkflow.create(workflow_config)
|
||||
@ -12,19 +12,10 @@
|
||||
"""
|
||||
Test how we create and update status in Ingestion Pipelines
|
||||
"""
|
||||
import json
|
||||
from unittest import TestCase
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
from _openmetadata_testutils.ometa import int_admin_ometa
|
||||
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
||||
BasicAuth,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||
MysqlConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||
IngestionPipeline,
|
||||
PipelineState,
|
||||
@ -36,178 +27,110 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
|
||||
StepSummary,
|
||||
)
|
||||
from metadata.ingestion.api.status import TruncatedStackTraceError
|
||||
from metadata.workflow.metadata import MetadataWorkflow
|
||||
|
||||
from ..integration_base import (
|
||||
METADATA_INGESTION_CONFIG_TEMPLATE,
|
||||
generate_name,
|
||||
get_create_service,
|
||||
)
|
||||
if sys.version_info < (3, 9):
|
||||
pytest.skip("requires python 3.9+", allow_module_level=True)
|
||||
|
||||
|
||||
class OMetaTableTest(TestCase):
|
||||
"""
|
||||
Run this integration test with the local API available
|
||||
Install the ingestion package before running the tests
|
||||
"""
|
||||
def test_create_ingestion_pipeline(workflow) -> None:
|
||||
"""We can create an ingestion pipeline"""
|
||||
|
||||
metadata = int_admin_ometa()
|
||||
service_name = generate_name()
|
||||
ingestion_pipeline: IngestionPipeline = workflow.ingestion_pipeline
|
||||
assert ingestion_pipeline is not None
|
||||
assert ingestion_pipeline.name.root == "ingestion"
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
"""
|
||||
Prepare ingredients
|
||||
"""
|
||||
# Create the service entity
|
||||
create_service = get_create_service(
|
||||
entity=DatabaseService, name=cls.service_name
|
||||
)
|
||||
cls.service_entity = cls.metadata.create_or_update(data=create_service)
|
||||
|
||||
workflow_config = json.loads(
|
||||
METADATA_INGESTION_CONFIG_TEMPLATE.format(
|
||||
type="mysql",
|
||||
service_name=cls.service_name,
|
||||
service_config=MysqlConnection(
|
||||
username="openmetadata_user",
|
||||
authType=BasicAuth(
|
||||
password="openmetadata_password",
|
||||
),
|
||||
hostPort="localhost:3306",
|
||||
).model_dump_json(),
|
||||
source_config={},
|
||||
def test_add_status(metadata, workflow) -> None:
|
||||
"""We can add status to the ingestion pipeline"""
|
||||
|
||||
ingestion_pipeline: IngestionPipeline = workflow.ingestion_pipeline
|
||||
assert ingestion_pipeline is not None
|
||||
|
||||
# We can send a status to the ingestion pipeline
|
||||
ingestion_status = IngestionStatus(
|
||||
[
|
||||
StepSummary(
|
||||
name="source",
|
||||
failures=[
|
||||
StackTraceError(
|
||||
name="error",
|
||||
error="error",
|
||||
stackTrace="stackTrace",
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
workflow_config["ingestionPipelineFQN"] = f"{cls.service_name}.ingestion"
|
||||
cls.workflow: MetadataWorkflow = MetadataWorkflow.create(workflow_config)
|
||||
]
|
||||
)
|
||||
|
||||
# Since we won't run the full workflow, let's create the service first
|
||||
# which is needed to create the ingestion
|
||||
cls.metadata.get_service_or_create(
|
||||
entity=DatabaseService, config=cls.workflow.config.source
|
||||
)
|
||||
pipeline_status: PipelineStatus = workflow._new_pipeline_status(
|
||||
PipelineState.success
|
||||
)
|
||||
pipeline_status.status = ingestion_status
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
"""
|
||||
Clean up
|
||||
"""
|
||||
# Gets properly created
|
||||
metadata.create_or_update_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
|
||||
)
|
||||
|
||||
service_id = str(
|
||||
cls.metadata.get_by_name(
|
||||
entity=DatabaseService, fqn=cls.service_name
|
||||
).id.root
|
||||
)
|
||||
real_pipeline_status: PipelineStatus = metadata.get_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, workflow.run_id
|
||||
)
|
||||
assert real_pipeline_status.pipelineState == PipelineState.success
|
||||
|
||||
cls.metadata.delete(
|
||||
entity=DatabaseService,
|
||||
entity_id=service_id,
|
||||
recursive=True,
|
||||
hard_delete=True,
|
||||
)
|
||||
# If the status has too long names/errors it will fail
|
||||
too_long_status = IngestionStatus(
|
||||
[
|
||||
StepSummary(
|
||||
name="source",
|
||||
failures=[
|
||||
StackTraceError(
|
||||
name="error",
|
||||
error="error" * 20_000_000,
|
||||
stackTrace="stackTrace",
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
def test_create_ingestion_pipeline(self) -> None:
|
||||
"""We can create an ingestion pipeline"""
|
||||
pipeline_status: PipelineStatus = workflow._new_pipeline_status(
|
||||
PipelineState.success
|
||||
)
|
||||
pipeline_status.status = too_long_status
|
||||
|
||||
ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline
|
||||
assert ingestion_pipeline is not None
|
||||
assert ingestion_pipeline.name.root == "ingestion"
|
||||
|
||||
def test_add_status(self) -> None:
|
||||
"""We can add status to the ingestion pipeline"""
|
||||
|
||||
ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline
|
||||
assert ingestion_pipeline is not None
|
||||
|
||||
# We can send a status to the ingestion pipeline
|
||||
ingestion_status = IngestionStatus(
|
||||
[
|
||||
StepSummary(
|
||||
name="source",
|
||||
failures=[
|
||||
StackTraceError(
|
||||
name="error",
|
||||
error="error",
|
||||
stackTrace="stackTrace",
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
pipeline_status: PipelineStatus = self.workflow._new_pipeline_status(
|
||||
PipelineState.success
|
||||
)
|
||||
pipeline_status.status = ingestion_status
|
||||
|
||||
# Gets properly created
|
||||
self.metadata.create_or_update_pipeline_status(
|
||||
# We get a bad request error
|
||||
with pytest.raises(Exception) as exc:
|
||||
metadata.create_or_update_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
|
||||
)
|
||||
|
||||
real_pipeline_status: PipelineStatus = self.metadata.get_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, self.workflow.run_id
|
||||
)
|
||||
assert real_pipeline_status.pipelineState == PipelineState.success
|
||||
assert ("exceeds the maximum allowed" in str(exc.value)) or (
|
||||
"Connection aborted." in str(exc.value)
|
||||
)
|
||||
|
||||
# If the status has too long names/errors it will fail
|
||||
too_long_status = IngestionStatus(
|
||||
[
|
||||
StepSummary(
|
||||
name="source",
|
||||
failures=[
|
||||
StackTraceError(
|
||||
name="error",
|
||||
error="error" * 20_000_000,
|
||||
stackTrace="stackTrace",
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
pipeline_status: PipelineStatus = self.workflow._new_pipeline_status(
|
||||
PipelineState.success
|
||||
)
|
||||
pipeline_status.status = too_long_status
|
||||
|
||||
# We get a bad request error
|
||||
with pytest.raises(Exception) as exc:
|
||||
self.metadata.create_or_update_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
|
||||
# If we truncate the status it all runs good
|
||||
truncated_long_status = IngestionStatus(
|
||||
[
|
||||
StepSummary(
|
||||
name="source",
|
||||
failures=[
|
||||
TruncatedStackTraceError(
|
||||
name="error",
|
||||
error="error" * 20_000_000,
|
||||
stackTrace="stackTrace",
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
assert ("exceeds the maximum allowed" in str(exc.value)) or (
|
||||
"Connection aborted." in str(exc.value)
|
||||
)
|
||||
pipeline_status: PipelineStatus = workflow._new_pipeline_status(
|
||||
PipelineState.success
|
||||
)
|
||||
pipeline_status.status = truncated_long_status
|
||||
|
||||
# If we truncate the status it all runs good
|
||||
truncated_long_status = IngestionStatus(
|
||||
[
|
||||
StepSummary(
|
||||
name="source",
|
||||
failures=[
|
||||
TruncatedStackTraceError(
|
||||
name="error",
|
||||
error="error" * 20_000_000,
|
||||
stackTrace="stackTrace",
|
||||
)
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
res = metadata.create_or_update_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
|
||||
)
|
||||
|
||||
pipeline_status: PipelineStatus = self.workflow._new_pipeline_status(
|
||||
PipelineState.success
|
||||
)
|
||||
pipeline_status.status = truncated_long_status
|
||||
|
||||
res = self.metadata.create_or_update_pipeline_status(
|
||||
ingestion_pipeline.fullyQualifiedName.root, pipeline_status
|
||||
)
|
||||
|
||||
assert (
|
||||
res["entityFullyQualifiedName"]
|
||||
== ingestion_pipeline.fullyQualifiedName.root
|
||||
)
|
||||
assert res["entityFullyQualifiedName"] == ingestion_pipeline.fullyQualifiedName.root
|
||||
|
||||
70
ingestion/tests/integration/workflow/conftest.py
Normal file
70
ingestion/tests/integration/workflow/conftest.py
Normal file
@ -0,0 +1,70 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Automations integration tests"""
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from ..containers import MySqlContainerConfigs, get_mysql_container
|
||||
|
||||
MYSQL_CONFIG = """
|
||||
source:
|
||||
type: mysql
|
||||
serviceName: local_mysql_test
|
||||
serviceConnection:
|
||||
config:
|
||||
type: Mysql
|
||||
username: {user}
|
||||
authType:
|
||||
password: {password}
|
||||
hostPort: localhost:{port}
|
||||
databaseSchema: {database}
|
||||
connectionOptions: {{}}
|
||||
connectionArguments: {{}}
|
||||
sourceConfig:
|
||||
config:
|
||||
schemaFilterPattern:
|
||||
excludes:
|
||||
- mysql.*
|
||||
- information_schema.*
|
||||
- performance_schema.*
|
||||
- sys.*
|
||||
sink:
|
||||
type: metadata-rest
|
||||
config: {{}}
|
||||
workflowConfig:
|
||||
openMetadataServerConfig:
|
||||
hostPort: http://localhost:8585/api
|
||||
authProvider: openmetadata
|
||||
securityConfig:
|
||||
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
ingestionPipelineFQN: local_mysql_test.test_metadata
|
||||
pipelineRunId: 948eba5d-94ec-4fc5-b233-29038722db16
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mysql_container():
|
||||
with get_mysql_container(
|
||||
MySqlContainerConfigs(container_name=str(uuid.uuid4()))
|
||||
) as container:
|
||||
yield container
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mysql_config(mysql_container):
|
||||
return MYSQL_CONFIG.format(
|
||||
user=mysql_container.username,
|
||||
password=mysql_container.password,
|
||||
port=mysql_container.get_exposed_port(3306),
|
||||
database=mysql_container.dbname,
|
||||
)
|
||||
@ -27,6 +27,6 @@ workflowConfig:
|
||||
hostPort: http://localhost:8585/api
|
||||
authProvider: openmetadata
|
||||
securityConfig:
|
||||
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
jwtToken: "..."
|
||||
ingestionPipelineFQN: local_mysql_test.test_metadata
|
||||
pipelineRunId: 948eba5d-94ec-4fc5-b233-29038722db16
|
||||
@ -11,13 +11,13 @@
|
||||
|
||||
import importlib
|
||||
import pathlib
|
||||
import sys
|
||||
import uuid
|
||||
from unittest import TestCase
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from metadata.config.common import ConfigurationError, load_config_file
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||
IngestionPipeline,
|
||||
@ -25,144 +25,98 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
|
||||
StepSummary,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.workflow.metadata import MetadataWorkflow
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
pytest.skip("requires python 3.9+", allow_module_level=True)
|
||||
|
||||
class WorkflowTest(TestCase):
|
||||
"""
|
||||
Validate workflow methods
|
||||
"""
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(
|
||||
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
),
|
||||
def delete_service(metadata):
|
||||
service_id = str(
|
||||
metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test").id.root
|
||||
)
|
||||
metadata = OpenMetadata(server_config)
|
||||
|
||||
assert metadata.health_check()
|
||||
metadata.delete(
|
||||
entity=DatabaseService,
|
||||
entity_id=service_id,
|
||||
recursive=True,
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
def delete_service(self):
|
||||
service_id = str(
|
||||
self.metadata.get_by_name(
|
||||
entity=DatabaseService, fqn="local_mysql_test"
|
||||
).id.root
|
||||
)
|
||||
|
||||
self.metadata.delete(
|
||||
entity=DatabaseService,
|
||||
entity_id=service_id,
|
||||
recursive=True,
|
||||
hard_delete=True,
|
||||
)
|
||||
def test_get_200():
|
||||
key = "metadata.ingestion.sink.metadata_rest.MetadataRestSink"
|
||||
if key.find(".") >= 0:
|
||||
module_name, class_name = key.rsplit(".", 1)
|
||||
my_class = getattr(importlib.import_module(module_name), class_name)
|
||||
assert my_class is not None
|
||||
|
||||
def test_get_200(self):
|
||||
key = "metadata.ingestion.sink.metadata_rest.MetadataRestSink"
|
||||
|
||||
def test_get_4xx():
|
||||
key = "metadata.ingestion.sink.MYSQL.mysqlSINK"
|
||||
with pytest.raises(ModuleNotFoundError):
|
||||
if key.find(".") >= 0:
|
||||
module_name, class_name = key.rsplit(".", 1)
|
||||
my_class = getattr(importlib.import_module(module_name), class_name)
|
||||
self.assertEqual((my_class is not None), True)
|
||||
getattr(importlib.import_module(module_name), class_name)
|
||||
|
||||
def test_get_4xx(self):
|
||||
key = "metadata.ingestion.sink.MYSQL.mysqlSINK"
|
||||
try:
|
||||
if key.find(".") >= 0:
|
||||
module_name, class_name = key.rsplit(".", 1)
|
||||
getattr(importlib.import_module(module_name), class_name)
|
||||
except ModuleNotFoundError:
|
||||
self.assertRaises(ModuleNotFoundError)
|
||||
|
||||
def test_title_typeClassFetch(self):
|
||||
is_file = True
|
||||
file_type = "query-parser"
|
||||
if is_file:
|
||||
replace = file_type.replace("-", "_")
|
||||
else:
|
||||
replace = "".join(
|
||||
[i.title() for i in file_type.replace("-", "_").split("_")]
|
||||
)
|
||||
self.assertEqual(replace, "query_parser")
|
||||
def test_execute_200(metadata, mysql_config):
|
||||
workflow_config = yaml.safe_load(mysql_config)
|
||||
workflow = MetadataWorkflow.create(workflow_config)
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
|
||||
def test_title_typeClassFetch_4xx(self):
|
||||
is_file = False
|
||||
file_type = "query-parser"
|
||||
if is_file:
|
||||
replace = file_type.replace("-", "_")
|
||||
else:
|
||||
replace = "".join(
|
||||
[i.title() for i in file_type.replace("-", "_").split("_")]
|
||||
)
|
||||
self.assertEqual(replace, "QueryParser")
|
||||
# Service is created
|
||||
assert metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test")
|
||||
|
||||
def test_execute_200(self):
|
||||
current_dir = pathlib.Path(__file__).resolve().parent
|
||||
config_file = current_dir.joinpath("mysql_test.yaml")
|
||||
workflow_config = load_config_file(config_file)
|
||||
workflow = MetadataWorkflow.create(workflow_config)
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
# The service has an ingestion pipeline (since it has the ingestionPipelineFQN inside and the runId)
|
||||
assert metadata.get_by_name(
|
||||
entity=IngestionPipeline, fqn=workflow_config["ingestionPipelineFQN"]
|
||||
)
|
||||
|
||||
# Service is created
|
||||
self.assertIsNotNone(
|
||||
self.metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test")
|
||||
)
|
||||
# The pipeline has the right status
|
||||
pipeline_status = metadata.get_pipeline_status(
|
||||
workflow_config["ingestionPipelineFQN"], workflow_config["pipelineRunId"]
|
||||
)
|
||||
|
||||
# The service has an ingestion pipeline (since it has the ingestionPipelineFQN inside and the runId)
|
||||
self.assertIsNotNone(
|
||||
self.metadata.get_by_name(
|
||||
entity=IngestionPipeline, fqn=workflow_config["ingestionPipelineFQN"]
|
||||
)
|
||||
)
|
||||
# We have status for the source and sink
|
||||
assert len(pipeline_status.status.root) == 2
|
||||
assert isinstance(pipeline_status.status.root[0], StepSummary)
|
||||
|
||||
# The pipeline has the right status
|
||||
pipeline_status = self.metadata.get_pipeline_status(
|
||||
workflow_config["ingestionPipelineFQN"], workflow_config["pipelineRunId"]
|
||||
)
|
||||
# Rerunning with a different Run ID still generates the correct status
|
||||
new_run_id = str(uuid.uuid4())
|
||||
workflow_config["pipelineRunId"] = new_run_id
|
||||
|
||||
# We have status for the source and sink
|
||||
self.assertEqual(len(pipeline_status.status.root), 2)
|
||||
self.assertTrue(isinstance(pipeline_status.status.root[0], StepSummary))
|
||||
workflow = MetadataWorkflow.create(workflow_config)
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
|
||||
# Rerunning with a different Run ID still generates the correct status
|
||||
new_run_id = str(uuid.uuid4())
|
||||
workflow_config["pipelineRunId"] = new_run_id
|
||||
pipeline_status = metadata.get_pipeline_status(
|
||||
workflow_config["ingestionPipelineFQN"], new_run_id
|
||||
)
|
||||
|
||||
workflow = MetadataWorkflow.create(workflow_config)
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
# We have status for the source and sink
|
||||
assert len(pipeline_status.status.root) == 2
|
||||
assert isinstance(pipeline_status.status.root[0], StepSummary)
|
||||
|
||||
pipeline_status = self.metadata.get_pipeline_status(
|
||||
workflow_config["ingestionPipelineFQN"], new_run_id
|
||||
)
|
||||
delete_service(metadata)
|
||||
|
||||
# We have status for the source and sink
|
||||
self.assertEqual(len(pipeline_status.status.root), 2)
|
||||
self.assertTrue(isinstance(pipeline_status.status.root[0], StepSummary))
|
||||
|
||||
self.delete_service()
|
||||
def test_execute_4xx():
|
||||
config_file = pathlib.Path("/tmp/mysql_test123")
|
||||
with pytest.raises(ConfigurationError):
|
||||
load_config_file(config_file)
|
||||
|
||||
def test_execute_4xx(self):
|
||||
config_file = pathlib.Path("/tmp/mysql_test123")
|
||||
try:
|
||||
load_config_file(config_file)
|
||||
except ConfigurationError:
|
||||
self.assertRaises(ConfigurationError)
|
||||
|
||||
def test_fail_no_service_connection_and_overwrite(self):
|
||||
current_dir = pathlib.Path(__file__).resolve().parent
|
||||
config_file = current_dir.joinpath("mysql_test.yaml")
|
||||
workflow_config = load_config_file(config_file)
|
||||
def test_fail_no_service_connection_and_overwrite():
|
||||
current_dir = pathlib.Path(__file__).resolve().parent
|
||||
config_file = current_dir.joinpath("mysql_test.yaml")
|
||||
workflow_config = load_config_file(config_file)
|
||||
|
||||
del workflow_config["source"]["serviceConnection"]
|
||||
workflow_config["workflowConfig"]["openMetadataServerConfig"][
|
||||
"forceEntityOverwriting"
|
||||
] = True
|
||||
del workflow_config["source"]["serviceConnection"]
|
||||
workflow_config["workflowConfig"]["openMetadataServerConfig"][
|
||||
"forceEntityOverwriting"
|
||||
] = True
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
MetadataWorkflow.create(workflow_config)
|
||||
with pytest.raises(AttributeError):
|
||||
MetadataWorkflow.create(workflow_config)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user