diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 24771256b94..013f8af36c1 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -141,7 +141,7 @@ from metadata.utils.constants import UTF_8 from metadata.utils.fqn import FQN_SEPARATOR from metadata.utils.helpers import get_standard_chart_type from metadata.utils.logger import ingestion_logger -from metadata.utils.time_utils import convert_timestamp_to_milliseconds +from metadata.utils.time_utils import datetime_to_timestamp logger = ingestion_logger() @@ -1646,11 +1646,10 @@ class SampleDataSource( life_cycle_data.created = AccessDetails( timestamp=Timestamp( int( - convert_timestamp_to_milliseconds( - ( - datetime.now() - - timedelta(days=life_cycle["created"]["days"]) - ).timestamp() + datetime_to_timestamp( + datetime_value=datetime.now() + - timedelta(days=life_cycle["created"]["days"]), + milliseconds=True, ) ) ), @@ -1660,11 +1659,10 @@ class SampleDataSource( life_cycle_data.updated = AccessDetails( timestamp=Timestamp( int( - convert_timestamp_to_milliseconds( - ( - datetime.now() - - timedelta(days=life_cycle["updated"]["days"]) - ).timestamp() + datetime_to_timestamp( + datetime_value=datetime.now() + - timedelta(days=life_cycle["updated"]["days"]), + milliseconds=True, ) ), ), @@ -1674,11 +1672,10 @@ class SampleDataSource( life_cycle_data.accessed = AccessDetails( timestamp=Timestamp( int( - convert_timestamp_to_milliseconds( - ( - datetime.now() - - timedelta(days=life_cycle["accessed"]["days"]) - ).timestamp() + datetime_to_timestamp( + datetime_value=datetime.now() + - timedelta(days=life_cycle["accessed"]["days"]), + milliseconds=True, ) ), ), diff --git a/ingestion/tests/integration/automations/conftest.py b/ingestion/tests/integration/automations/conftest.py new file mode 100644 index 00000000000..174847ecdb7 --- /dev/null +++ b/ingestion/tests/integration/automations/conftest.py @@ -0,0 +1,25 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Automations integration tests""" +import uuid + +import pytest + +from ..containers import MySqlContainerConfigs, get_mysql_container + + +@pytest.fixture(scope="session") +def mysql_container(): + with get_mysql_container( + MySqlContainerConfigs(container_name=str(uuid.uuid4())) + ) as container: + yield container diff --git a/ingestion/tests/integration/automations/test_connection_automation.py b/ingestion/tests/integration/automations/test_connection_automation.py index 79ef38da4c2..6de6597ef89 100644 --- a/ingestion/tests/integration/automations/test_connection_automation.py +++ b/ingestion/tests/integration/automations/test_connection_automation.py @@ -12,8 +12,9 @@ """ OpenMetadata high-level API Workflow test """ -from unittest import TestCase +import sys +import pytest from sqlalchemy.engine import Engine from metadata.generated.schema.api.automations.createWorkflow import ( @@ -34,129 +35,100 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect MysqlConnection, MySQLType, ) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) from metadata.generated.schema.entity.services.connections.testConnectionResult import ( StatusType, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.entity.services.serviceType import ServiceType -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn -SERVICE_CONNECTION = MysqlConnection( - username="openmetadata_user", - authType=BasicAuth(password="openmetadata_password"), - hostPort="localhost:3306", -) +if sys.version_info < (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) -class TestConnectionAutomationTest(TestCase): +def test_connection_workflow(metadata, mysql_container): """ - Run this integration test with the local API available - Install the ingestion package before running the tests + Test all the steps related to the test connection automation workflow """ - service_entity_id = None + service_connection = MysqlConnection( + username=mysql_container.username, + authType=BasicAuth(password=mysql_container.password), + hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}", + ) - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + new_workflow_request = CreateWorkflowRequest( + name="test-connection-mysql", + description="description", + workflowType=WorkflowType.TEST_CONNECTION, + request=TestServiceConnectionRequest( + serviceType=ServiceType.Database, + connectionType=MySQLType.Mysql.value, + connection=DatabaseConnection( + config=service_connection, + ), ), ) - metadata = OpenMetadata(server_config) - assert metadata.health_check() + automation_workflow: Workflow = metadata.create_or_update(data=new_workflow_request) + engine: Engine = get_connection(service_connection) - def test_connection_workflow(self): - """ - Test all the steps related to the test connection automation workflow - """ - new_workflow_request = CreateWorkflowRequest( - name="test-connection-mysql", - description="description", - workflowType=WorkflowType.TEST_CONNECTION, - request=TestServiceConnectionRequest( - serviceType=ServiceType.Database, - connectionType=MySQLType.Mysql.value, - connection=DatabaseConnection( - config=SERVICE_CONNECTION, - ), + test_connection_fn = get_test_connection_fn(service_connection) + test_connection_fn(metadata, engine, service_connection, automation_workflow) + + final_workflow: Workflow = metadata.get_by_name( + entity=Workflow, fqn="test-connection-mysql" + ) + + assert final_workflow.status == WorkflowStatus.Successful + assert len(final_workflow.response.steps) == 4 + assert final_workflow.response.status.value == StatusType.Successful.value + + metadata.delete( + entity=Workflow, + entity_id=str(automation_workflow.id.root), + hard_delete=True, + ) + + +def test_connection_workflow_ko(metadata): + """Test connection that will fail""" + wrong_service_connection = MysqlConnection( + username="openmetadata_user", + authType=BasicAuth(password="openmetadata_password"), + hostPort="localhost:8585", # There's something running there, but it's not MySQL + databaseSchema="openmetadata_db", + ) + + wrong_workflow_request = CreateWorkflowRequest( + name="test-connection-mysql-bad", + description="description", + workflowType=WorkflowType.TEST_CONNECTION, + request=TestServiceConnectionRequest( + serviceType=ServiceType.Database, + connectionType=MySQLType.Mysql.value, + connection=DatabaseConnection( + config=wrong_service_connection, ), - ) + ), + ) - automation_workflow: Workflow = self.metadata.create_or_update( - data=new_workflow_request - ) - engine: Engine = get_connection(SERVICE_CONNECTION) + automation_workflow: Workflow = metadata.create_or_update( + data=wrong_workflow_request + ) + engine: Engine = get_connection(wrong_service_connection) - test_connection_fn = get_test_connection_fn(SERVICE_CONNECTION) - test_connection_fn( - self.metadata, engine, SERVICE_CONNECTION, automation_workflow - ) + test_connection_fn = get_test_connection_fn(wrong_service_connection) + test_connection_fn(metadata, engine, wrong_service_connection, automation_workflow) - final_workflow: Workflow = self.metadata.get_by_name( - entity=Workflow, fqn="test-connection-mysql" - ) + final_workflow: Workflow = metadata.get_by_name( + entity=Workflow, fqn="test-connection-mysql-bad" + ) - self.assertEqual(final_workflow.status, WorkflowStatus.Successful) - self.assertEqual(len(final_workflow.response.steps), 4) - self.assertEqual( - final_workflow.response.status.value, StatusType.Successful.value - ) + assert final_workflow.response.status == StatusType.Failed - self.metadata.delete( - entity=Workflow, - entity_id=str(automation_workflow.id.root), - hard_delete=True, - ) - - def test_connection_workflow_ko(self): - """Test connection that will fail""" - wrong_service_connection = MysqlConnection( - username="openmetadata_user", - authType=BasicAuth(password="openmetadata_password"), - hostPort="localhost:8585", # There's something running there, but it's not MySQL - databaseSchema="openmetadata_db", - ) - - wrong_workflow_request = CreateWorkflowRequest( - name="test-connection-mysql-bad", - description="description", - workflowType=WorkflowType.TEST_CONNECTION, - request=TestServiceConnectionRequest( - serviceType=ServiceType.Database, - connectionType=MySQLType.Mysql.value, - connection=DatabaseConnection( - config=wrong_service_connection, - ), - ), - ) - - automation_workflow: Workflow = self.metadata.create_or_update( - data=wrong_workflow_request - ) - engine: Engine = get_connection(wrong_service_connection) - - test_connection_fn = get_test_connection_fn(wrong_service_connection) - test_connection_fn( - self.metadata, engine, wrong_service_connection, automation_workflow - ) - - final_workflow: Workflow = self.metadata.get_by_name( - entity=Workflow, fqn="test-connection-mysql-bad" - ) - - self.assertEqual(final_workflow.response.status, StatusType.Failed) - - self.metadata.delete( - entity=Workflow, - entity_id=str(automation_workflow.id.root), - hard_delete=True, - ) + metadata.delete( + entity=Workflow, + entity_id=str(automation_workflow.id.root), + hard_delete=True, + ) diff --git a/ingestion/tests/integration/connections/conftest.py b/ingestion/tests/integration/connections/conftest.py new file mode 100644 index 00000000000..4a20fcab40c --- /dev/null +++ b/ingestion/tests/integration/connections/conftest.py @@ -0,0 +1,25 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Connections integration tests""" +import uuid + +import pytest + +from ..containers import MySqlContainerConfigs, get_mysql_container + + +@pytest.fixture(scope="session") +def mysql_container(): + with get_mysql_container( + MySqlContainerConfigs(container_name=str(uuid.uuid4())) + ) as container: + yield container diff --git a/ingestion/tests/integration/connections/test_mysql_connection.py b/ingestion/tests/integration/connections/test_mysql_connection.py index 96b445ac3fb..13bbabc14f0 100644 --- a/ingestion/tests/integration/connections/test_mysql_connection.py +++ b/ingestion/tests/integration/connections/test_mysql_connection.py @@ -8,12 +8,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ MySQL connection test """ -from unittest import TestCase +import sys +import pytest from sqlalchemy.engine import Engine from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( @@ -22,51 +22,27 @@ from metadata.generated.schema.entity.services.connections.database.common.basic from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( MysqlConnection, ) -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +if sys.version_info < (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) -class MySQLConnectionTest(TestCase): + +def test_test_connection(metadata, mysql_container): """ - Validate MySQL connections + Test connection function requires: + - ometa + - connection object, i.e., the engine + - the service connection """ - - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - ), - ) - metadata = OpenMetadata(server_config) - - assert metadata.health_check() - service_connection = MysqlConnection( - username="openmetadata_user", - authType=BasicAuth(password="openmetadata_password"), - hostPort="localhost:3306", - databaseSchema="openmetadata_db", + username=mysql_container.username, + authType=BasicAuth(password=mysql_container.password), + hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}", ) - def test_get_connection(self): - engine = get_connection(self.service_connection) - self.assertTrue(isinstance(engine, Engine)) + engine = get_connection(service_connection) + assert isinstance(engine, Engine) - def test_test_connection(self): - """ - Test connection function requires: - - ometa - - connection object, i.e., the engine - - the service connection - """ - engine = get_connection(self.service_connection) - - _test_connection_fn = get_test_connection_fn(self.service_connection) - _test_connection_fn(self.metadata, engine, self.service_connection) + _test_connection_fn = get_test_connection_fn(service_connection) + _test_connection_fn(metadata, engine, service_connection) diff --git a/ingestion/tests/integration/ometa/conftest.py b/ingestion/tests/integration/ometa/conftest.py new file mode 100644 index 00000000000..f128dd148ba --- /dev/null +++ b/ingestion/tests/integration/ometa/conftest.py @@ -0,0 +1,80 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Automations integration tests""" +import json +import uuid + +import pytest + +from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( + BasicAuth, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.workflow.metadata import MetadataWorkflow + +from ..containers import MySqlContainerConfigs, get_mysql_container +from ..integration_base import ( + METADATA_INGESTION_CONFIG_TEMPLATE, + generate_name, + get_create_service, +) + + +@pytest.fixture(scope="module") +def mysql_container(): + with get_mysql_container( + MySqlContainerConfigs(container_name=str(uuid.uuid4())) + ) as container: + yield container + + +@pytest.fixture(scope="module") +def service(metadata): + service_name = generate_name() + create_service = get_create_service(entity=DatabaseService, name=service_name) + yield metadata.create_or_update(data=create_service) + + service_id = str( + metadata.get_by_name(entity=DatabaseService, fqn=service_name).id.root + ) + + metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) + + +@pytest.fixture(scope="module") +def workflow(metadata, service, mysql_container): + service_name = service.name.root + + workflow_config = json.loads( + METADATA_INGESTION_CONFIG_TEMPLATE.format( + type="mysql", + service_name=service_name, + service_config=MysqlConnection( + username=mysql_container.username, + authType=BasicAuth( + password=mysql_container.password, + ), + hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}", + ).model_dump_json(), + source_config={}, + ) + ) + workflow_config["ingestionPipelineFQN"] = f"{service_name}.ingestion" + return MetadataWorkflow.create(workflow_config) diff --git a/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py b/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py index fa821109d53..dd8023bbde4 100644 --- a/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py +++ b/ingestion/tests/integration/ometa/test_ometa_ingestion_pipeline.py @@ -12,19 +12,10 @@ """ Test how we create and update status in Ingestion Pipelines """ -import json -from unittest import TestCase +import sys import pytest -from _openmetadata_testutils.ometa import int_admin_ometa -from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( - BasicAuth, -) -from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( - MysqlConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, PipelineState, @@ -36,178 +27,110 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import StepSummary, ) from metadata.ingestion.api.status import TruncatedStackTraceError -from metadata.workflow.metadata import MetadataWorkflow -from ..integration_base import ( - METADATA_INGESTION_CONFIG_TEMPLATE, - generate_name, - get_create_service, -) +if sys.version_info < (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) -class OMetaTableTest(TestCase): - """ - Run this integration test with the local API available - Install the ingestion package before running the tests - """ +def test_create_ingestion_pipeline(workflow) -> None: + """We can create an ingestion pipeline""" - metadata = int_admin_ometa() - service_name = generate_name() + ingestion_pipeline: IngestionPipeline = workflow.ingestion_pipeline + assert ingestion_pipeline is not None + assert ingestion_pipeline.name.root == "ingestion" - @classmethod - def setUpClass(cls) -> None: - """ - Prepare ingredients - """ - # Create the service entity - create_service = get_create_service( - entity=DatabaseService, name=cls.service_name - ) - cls.service_entity = cls.metadata.create_or_update(data=create_service) - workflow_config = json.loads( - METADATA_INGESTION_CONFIG_TEMPLATE.format( - type="mysql", - service_name=cls.service_name, - service_config=MysqlConnection( - username="openmetadata_user", - authType=BasicAuth( - password="openmetadata_password", - ), - hostPort="localhost:3306", - ).model_dump_json(), - source_config={}, +def test_add_status(metadata, workflow) -> None: + """We can add status to the ingestion pipeline""" + + ingestion_pipeline: IngestionPipeline = workflow.ingestion_pipeline + assert ingestion_pipeline is not None + + # We can send a status to the ingestion pipeline + ingestion_status = IngestionStatus( + [ + StepSummary( + name="source", + failures=[ + StackTraceError( + name="error", + error="error", + stackTrace="stackTrace", + ) + ], ) - ) - workflow_config["ingestionPipelineFQN"] = f"{cls.service_name}.ingestion" - cls.workflow: MetadataWorkflow = MetadataWorkflow.create(workflow_config) + ] + ) - # Since we won't run the full workflow, let's create the service first - # which is needed to create the ingestion - cls.metadata.get_service_or_create( - entity=DatabaseService, config=cls.workflow.config.source - ) + pipeline_status: PipelineStatus = workflow._new_pipeline_status( + PipelineState.success + ) + pipeline_status.status = ingestion_status - @classmethod - def tearDownClass(cls) -> None: - """ - Clean up - """ + # Gets properly created + metadata.create_or_update_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, pipeline_status + ) - service_id = str( - cls.metadata.get_by_name( - entity=DatabaseService, fqn=cls.service_name - ).id.root - ) + real_pipeline_status: PipelineStatus = metadata.get_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, workflow.run_id + ) + assert real_pipeline_status.pipelineState == PipelineState.success - cls.metadata.delete( - entity=DatabaseService, - entity_id=service_id, - recursive=True, - hard_delete=True, - ) + # If the status has too long names/errors it will fail + too_long_status = IngestionStatus( + [ + StepSummary( + name="source", + failures=[ + StackTraceError( + name="error", + error="error" * 20_000_000, + stackTrace="stackTrace", + ) + ], + ) + ] + ) - def test_create_ingestion_pipeline(self) -> None: - """We can create an ingestion pipeline""" + pipeline_status: PipelineStatus = workflow._new_pipeline_status( + PipelineState.success + ) + pipeline_status.status = too_long_status - ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline - assert ingestion_pipeline is not None - assert ingestion_pipeline.name.root == "ingestion" - - def test_add_status(self) -> None: - """We can add status to the ingestion pipeline""" - - ingestion_pipeline: IngestionPipeline = self.workflow.ingestion_pipeline - assert ingestion_pipeline is not None - - # We can send a status to the ingestion pipeline - ingestion_status = IngestionStatus( - [ - StepSummary( - name="source", - failures=[ - StackTraceError( - name="error", - error="error", - stackTrace="stackTrace", - ) - ], - ) - ] - ) - - pipeline_status: PipelineStatus = self.workflow._new_pipeline_status( - PipelineState.success - ) - pipeline_status.status = ingestion_status - - # Gets properly created - self.metadata.create_or_update_pipeline_status( + # We get a bad request error + with pytest.raises(Exception) as exc: + metadata.create_or_update_pipeline_status( ingestion_pipeline.fullyQualifiedName.root, pipeline_status ) - real_pipeline_status: PipelineStatus = self.metadata.get_pipeline_status( - ingestion_pipeline.fullyQualifiedName.root, self.workflow.run_id - ) - assert real_pipeline_status.pipelineState == PipelineState.success + assert ("exceeds the maximum allowed" in str(exc.value)) or ( + "Connection aborted." in str(exc.value) + ) - # If the status has too long names/errors it will fail - too_long_status = IngestionStatus( - [ - StepSummary( - name="source", - failures=[ - StackTraceError( - name="error", - error="error" * 20_000_000, - stackTrace="stackTrace", - ) - ], - ) - ] - ) - - pipeline_status: PipelineStatus = self.workflow._new_pipeline_status( - PipelineState.success - ) - pipeline_status.status = too_long_status - - # We get a bad request error - with pytest.raises(Exception) as exc: - self.metadata.create_or_update_pipeline_status( - ingestion_pipeline.fullyQualifiedName.root, pipeline_status + # If we truncate the status it all runs good + truncated_long_status = IngestionStatus( + [ + StepSummary( + name="source", + failures=[ + TruncatedStackTraceError( + name="error", + error="error" * 20_000_000, + stackTrace="stackTrace", + ) + ], ) + ] + ) - assert ("exceeds the maximum allowed" in str(exc.value)) or ( - "Connection aborted." in str(exc.value) - ) + pipeline_status: PipelineStatus = workflow._new_pipeline_status( + PipelineState.success + ) + pipeline_status.status = truncated_long_status - # If we truncate the status it all runs good - truncated_long_status = IngestionStatus( - [ - StepSummary( - name="source", - failures=[ - TruncatedStackTraceError( - name="error", - error="error" * 20_000_000, - stackTrace="stackTrace", - ) - ], - ) - ] - ) + res = metadata.create_or_update_pipeline_status( + ingestion_pipeline.fullyQualifiedName.root, pipeline_status + ) - pipeline_status: PipelineStatus = self.workflow._new_pipeline_status( - PipelineState.success - ) - pipeline_status.status = truncated_long_status - - res = self.metadata.create_or_update_pipeline_status( - ingestion_pipeline.fullyQualifiedName.root, pipeline_status - ) - - assert ( - res["entityFullyQualifiedName"] - == ingestion_pipeline.fullyQualifiedName.root - ) + assert res["entityFullyQualifiedName"] == ingestion_pipeline.fullyQualifiedName.root diff --git a/ingestion/tests/integration/workflow/conftest.py b/ingestion/tests/integration/workflow/conftest.py new file mode 100644 index 00000000000..c0f493ea75c --- /dev/null +++ b/ingestion/tests/integration/workflow/conftest.py @@ -0,0 +1,70 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Automations integration tests""" +import uuid + +import pytest + +from ..containers import MySqlContainerConfigs, get_mysql_container + +MYSQL_CONFIG = """ +source: + type: mysql + serviceName: local_mysql_test + serviceConnection: + config: + type: Mysql + username: {user} + authType: + password: {password} + hostPort: localhost:{port} + databaseSchema: {database} + connectionOptions: {{}} + connectionArguments: {{}} + sourceConfig: + config: + schemaFilterPattern: + excludes: + - mysql.* + - information_schema.* + - performance_schema.* + - sys.* +sink: + type: metadata-rest + config: {{}} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" +ingestionPipelineFQN: local_mysql_test.test_metadata +pipelineRunId: 948eba5d-94ec-4fc5-b233-29038722db16 +""" + + +@pytest.fixture(scope="session") +def mysql_container(): + with get_mysql_container( + MySqlContainerConfigs(container_name=str(uuid.uuid4())) + ) as container: + yield container + + +@pytest.fixture(scope="session") +def mysql_config(mysql_container): + return MYSQL_CONFIG.format( + user=mysql_container.username, + password=mysql_container.password, + port=mysql_container.get_exposed_port(3306), + database=mysql_container.dbname, + ) diff --git a/ingestion/tests/integration/workflow/mysql_test.yaml b/ingestion/tests/integration/workflow/mysql_test.yaml index 93e6710fd30..56fed24367e 100644 --- a/ingestion/tests/integration/workflow/mysql_test.yaml +++ b/ingestion/tests/integration/workflow/mysql_test.yaml @@ -27,6 +27,6 @@ workflowConfig: hostPort: http://localhost:8585/api authProvider: openmetadata securityConfig: - jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + jwtToken: "..." ingestionPipelineFQN: local_mysql_test.test_metadata pipelineRunId: 948eba5d-94ec-4fc5-b233-29038722db16 \ No newline at end of file diff --git a/ingestion/tests/integration/workflow/test_workflow.py b/ingestion/tests/integration/workflow/test_workflow.py index 4e47efa2fec..86f0a079d9e 100644 --- a/ingestion/tests/integration/workflow/test_workflow.py +++ b/ingestion/tests/integration/workflow/test_workflow.py @@ -11,13 +11,13 @@ import importlib import pathlib +import sys import uuid -from unittest import TestCase + +import pytest +import yaml from metadata.config.common import ConfigurationError, load_config_file -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, @@ -25,144 +25,98 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StepSummary, ) -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.metadata import MetadataWorkflow +if sys.version_info < (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) -class WorkflowTest(TestCase): - """ - Validate workflow methods - """ - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - ), +def delete_service(metadata): + service_id = str( + metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test").id.root ) - metadata = OpenMetadata(server_config) - assert metadata.health_check() + metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) - def delete_service(self): - service_id = str( - self.metadata.get_by_name( - entity=DatabaseService, fqn="local_mysql_test" - ).id.root - ) - self.metadata.delete( - entity=DatabaseService, - entity_id=service_id, - recursive=True, - hard_delete=True, - ) +def test_get_200(): + key = "metadata.ingestion.sink.metadata_rest.MetadataRestSink" + if key.find(".") >= 0: + module_name, class_name = key.rsplit(".", 1) + my_class = getattr(importlib.import_module(module_name), class_name) + assert my_class is not None - def test_get_200(self): - key = "metadata.ingestion.sink.metadata_rest.MetadataRestSink" + +def test_get_4xx(): + key = "metadata.ingestion.sink.MYSQL.mysqlSINK" + with pytest.raises(ModuleNotFoundError): if key.find(".") >= 0: module_name, class_name = key.rsplit(".", 1) - my_class = getattr(importlib.import_module(module_name), class_name) - self.assertEqual((my_class is not None), True) + getattr(importlib.import_module(module_name), class_name) - def test_get_4xx(self): - key = "metadata.ingestion.sink.MYSQL.mysqlSINK" - try: - if key.find(".") >= 0: - module_name, class_name = key.rsplit(".", 1) - getattr(importlib.import_module(module_name), class_name) - except ModuleNotFoundError: - self.assertRaises(ModuleNotFoundError) - def test_title_typeClassFetch(self): - is_file = True - file_type = "query-parser" - if is_file: - replace = file_type.replace("-", "_") - else: - replace = "".join( - [i.title() for i in file_type.replace("-", "_").split("_")] - ) - self.assertEqual(replace, "query_parser") +def test_execute_200(metadata, mysql_config): + workflow_config = yaml.safe_load(mysql_config) + workflow = MetadataWorkflow.create(workflow_config) + workflow.execute() + workflow.stop() - def test_title_typeClassFetch_4xx(self): - is_file = False - file_type = "query-parser" - if is_file: - replace = file_type.replace("-", "_") - else: - replace = "".join( - [i.title() for i in file_type.replace("-", "_").split("_")] - ) - self.assertEqual(replace, "QueryParser") + # Service is created + assert metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test") - def test_execute_200(self): - current_dir = pathlib.Path(__file__).resolve().parent - config_file = current_dir.joinpath("mysql_test.yaml") - workflow_config = load_config_file(config_file) - workflow = MetadataWorkflow.create(workflow_config) - workflow.execute() - workflow.stop() + # The service has an ingestion pipeline (since it has the ingestionPipelineFQN inside and the runId) + assert metadata.get_by_name( + entity=IngestionPipeline, fqn=workflow_config["ingestionPipelineFQN"] + ) - # Service is created - self.assertIsNotNone( - self.metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test") - ) + # The pipeline has the right status + pipeline_status = metadata.get_pipeline_status( + workflow_config["ingestionPipelineFQN"], workflow_config["pipelineRunId"] + ) - # The service has an ingestion pipeline (since it has the ingestionPipelineFQN inside and the runId) - self.assertIsNotNone( - self.metadata.get_by_name( - entity=IngestionPipeline, fqn=workflow_config["ingestionPipelineFQN"] - ) - ) + # We have status for the source and sink + assert len(pipeline_status.status.root) == 2 + assert isinstance(pipeline_status.status.root[0], StepSummary) - # The pipeline has the right status - pipeline_status = self.metadata.get_pipeline_status( - workflow_config["ingestionPipelineFQN"], workflow_config["pipelineRunId"] - ) + # Rerunning with a different Run ID still generates the correct status + new_run_id = str(uuid.uuid4()) + workflow_config["pipelineRunId"] = new_run_id - # We have status for the source and sink - self.assertEqual(len(pipeline_status.status.root), 2) - self.assertTrue(isinstance(pipeline_status.status.root[0], StepSummary)) + workflow = MetadataWorkflow.create(workflow_config) + workflow.execute() + workflow.stop() - # Rerunning with a different Run ID still generates the correct status - new_run_id = str(uuid.uuid4()) - workflow_config["pipelineRunId"] = new_run_id + pipeline_status = metadata.get_pipeline_status( + workflow_config["ingestionPipelineFQN"], new_run_id + ) - workflow = MetadataWorkflow.create(workflow_config) - workflow.execute() - workflow.stop() + # We have status for the source and sink + assert len(pipeline_status.status.root) == 2 + assert isinstance(pipeline_status.status.root[0], StepSummary) - pipeline_status = self.metadata.get_pipeline_status( - workflow_config["ingestionPipelineFQN"], new_run_id - ) + delete_service(metadata) - # We have status for the source and sink - self.assertEqual(len(pipeline_status.status.root), 2) - self.assertTrue(isinstance(pipeline_status.status.root[0], StepSummary)) - self.delete_service() +def test_execute_4xx(): + config_file = pathlib.Path("/tmp/mysql_test123") + with pytest.raises(ConfigurationError): + load_config_file(config_file) - def test_execute_4xx(self): - config_file = pathlib.Path("/tmp/mysql_test123") - try: - load_config_file(config_file) - except ConfigurationError: - self.assertRaises(ConfigurationError) - def test_fail_no_service_connection_and_overwrite(self): - current_dir = pathlib.Path(__file__).resolve().parent - config_file = current_dir.joinpath("mysql_test.yaml") - workflow_config = load_config_file(config_file) +def test_fail_no_service_connection_and_overwrite(): + current_dir = pathlib.Path(__file__).resolve().parent + config_file = current_dir.joinpath("mysql_test.yaml") + workflow_config = load_config_file(config_file) - del workflow_config["source"]["serviceConnection"] - workflow_config["workflowConfig"]["openMetadataServerConfig"][ - "forceEntityOverwriting" - ] = True + del workflow_config["source"]["serviceConnection"] + workflow_config["workflowConfig"]["openMetadataServerConfig"][ + "forceEntityOverwriting" + ] = True - with self.assertRaises(AttributeError): - MetadataWorkflow.create(workflow_config) + with pytest.raises(AttributeError): + MetadataWorkflow.create(workflow_config)