| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | import sys | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import pytest | 
					
						
							|  |  |  | from pydantic import BaseModel | 
					
						
							|  |  |  | from sqlalchemy import VARBINARY | 
					
						
							|  |  |  | from sqlalchemy import Column as SQAColumn | 
					
						
							|  |  |  | from sqlalchemy import MetaData | 
					
						
							|  |  |  | from sqlalchemy import Table as SQATable | 
					
						
							|  |  |  | from sqlalchemy import create_engine | 
					
						
							|  |  |  | from sqlalchemy.dialects import postgresql | 
					
						
							|  |  |  | from sqlalchemy.engine import Connection, make_url | 
					
						
							|  |  |  | from sqlalchemy.sql import sqltypes | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-21 15:11:34 +02:00
										 |  |  | from _openmetadata_testutils.postgres.conftest import postgres_container | 
					
						
							| 
									
										
										
										
											2024-07-04 06:29:46 +02:00
										 |  |  | from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | from metadata.data_quality.api.models import TestCaseDefinition | 
					
						
							|  |  |  | from metadata.generated.schema.entity.data.table import Table | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  | from metadata.generated.schema.entity.services.databaseService import DatabaseService | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( | 
					
						
							|  |  |  |     TestSuiteConfigType, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2024-07-02 12:36:03 +02:00
										 |  |  | from metadata.generated.schema.tests.basic import ( | 
					
						
							|  |  |  |     TestCaseResult, | 
					
						
							|  |  |  |     TestCaseStatus, | 
					
						
							|  |  |  |     TestResultValue, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue | 
					
						
							|  |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							|  |  |  | from metadata.workflow.data_quality import TestSuiteWorkflow | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if not sys.version_info >= (3, 9): | 
					
						
							|  |  |  |     pytest.skip( | 
					
						
							|  |  |  |         "requires python 3.9+ due to incompatibility with testcontainers", | 
					
						
							|  |  |  |         allow_module_level=True, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestParameters(BaseModel): | 
					
						
							|  |  |  |     test_case_defintion: TestCaseDefinition | 
					
						
							|  |  |  |     table2_fqn: str | 
					
						
							|  |  |  |     expected: TestCaseResult | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, *args, **kwargs): | 
					
						
							|  |  |  |         if args: | 
					
						
							|  |  |  |             # Map positional arguments to fields | 
					
						
							|  |  |  |             field_names = list(self.__annotations__.keys()) | 
					
						
							|  |  |  |             kwargs.update(dict(zip(field_names, args))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         super().__init__(**kwargs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.mark.parametrize( | 
					
						
							|  |  |  |     "parameters", | 
					
						
							|  |  |  |     [ | 
					
						
							|  |  |  |         pytest.param(TestParameters(*t), id=t[0].name) | 
					
						
							|  |  |  |         for t in [ | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="compare_same_tables", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue( | 
					
						
							|  |  |  |                             name="keyColumns", value="['customer_id']" | 
					
						
							|  |  |  |                         ), | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Success, | 
					
						
							|  |  |  |                     failedRows=0, | 
					
						
							|  |  |  |                     passedRows=599, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="with_explicit_key_columns", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue( | 
					
						
							|  |  |  |                             name="keyColumns", value="['customer_id']" | 
					
						
							|  |  |  |                         ), | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.changed_customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Failed, | 
					
						
							|  |  |  |                     failedRows=321, | 
					
						
							|  |  |  |                     passedRows=278, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="resolve_primary_keys", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.changed_customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Failed, | 
					
						
							|  |  |  |                     failedRows=321, | 
					
						
							|  |  |  |                     passedRows=278, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="with_passing_threshold", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue(name="threshold", value="322"), | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.changed_customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Success, | 
					
						
							|  |  |  |                     failedRows=321, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="with_failing_threshold", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue(name="threshold", value="321"), | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.changed_customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Failed, | 
					
						
							|  |  |  |                     failedRows=321, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="with_where_clause", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue( | 
					
						
							|  |  |  |                             name="where", | 
					
						
							|  |  |  |                             value="MOD(customer_id, 2) != 0 AND MOD(customer_id, 13) != 0", | 
					
						
							|  |  |  |                         ), | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.changed_customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Success, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="without_first_name", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.customer_without_first_name", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Failed, | 
					
						
							| 
									
										
										
										
											2024-07-02 12:36:03 +02:00
										 |  |  |                     testResultValue=[ | 
					
						
							|  |  |  |                         TestResultValue(name="removedColumns", value="1"), | 
					
						
							|  |  |  |                         TestResultValue(name="addedColumns", value="0"), | 
					
						
							|  |  |  |                         TestResultValue(name="changedColumns", value="0"), | 
					
						
							|  |  |  |                     ], | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="without_first_name_with_extra_column", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue( | 
					
						
							|  |  |  |                             name="useColumns", value="['last_name', 'email']" | 
					
						
							|  |  |  |                         ) | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE.dvdrental.public.customer_without_first_name", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Success, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="postgres_vs_mysql_success", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[ | 
					
						
							|  |  |  |                         TestCaseParameterValue( | 
					
						
							|  |  |  |                             name="useColumns", | 
					
						
							|  |  |  |                             value=str( | 
					
						
							|  |  |  |                                 [ | 
					
						
							|  |  |  |                                     "store_id", | 
					
						
							|  |  |  |                                     "first_name", | 
					
						
							|  |  |  |                                     "last_name", | 
					
						
							|  |  |  |                                     "email", | 
					
						
							|  |  |  |                                     "activebool", | 
					
						
							|  |  |  |                                     "address_id", | 
					
						
							|  |  |  |                                     "active", | 
					
						
							|  |  |  |                                     # "create_date", # date types are incomparable for mysql and postgres | 
					
						
							|  |  |  |                                     "last_update", | 
					
						
							|  |  |  |                                 ] | 
					
						
							|  |  |  |                             ), | 
					
						
							|  |  |  |                         ) | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "MYSQL_SERVICE.default.test.customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Success, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             ( | 
					
						
							|  |  |  |                 TestCaseDefinition( | 
					
						
							|  |  |  |                     name="postgres_vs_mysql_failure", | 
					
						
							|  |  |  |                     testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                     computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                     parameterValues=[], | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |                 "MYSQL_SERVICE.default.test.changed_customer", | 
					
						
							|  |  |  |                 TestCaseResult( | 
					
						
							|  |  |  |                     testCaseStatus=TestCaseStatus.Failed, | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |     ], | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | def test_happy_paths( | 
					
						
							|  |  |  |     postgres_service: DatabaseService, | 
					
						
							|  |  |  |     prepare_data, | 
					
						
							|  |  |  |     ingest_postgres_metadata, | 
					
						
							|  |  |  |     ingest_mysql_service, | 
					
						
							|  |  |  |     patched_metadata, | 
					
						
							|  |  |  |     parameters: TestParameters, | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     sink_config, | 
					
						
							|  |  |  |     profiler_config, | 
					
						
							|  |  |  |     run_workflow, | 
					
						
							|  |  |  |     workflow_config, | 
					
						
							|  |  |  |     cleanup_fqns, | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | ): | 
					
						
							|  |  |  |     metadata = patched_metadata | 
					
						
							|  |  |  |     table1 = metadata.get_by_name( | 
					
						
							|  |  |  |         Table, | 
					
						
							|  |  |  |         f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", | 
					
						
							|  |  |  |         nullable=False, | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     cleanup_fqns( | 
					
						
							|  |  |  |         TestCase, | 
					
						
							|  |  |  |         f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}", | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |     table2_service = { | 
					
						
							|  |  |  |         "POSTGRES_SERVICE": postgres_service, | 
					
						
							|  |  |  |         "MYSQL_SERVICE": ingest_mysql_service, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |     for k, v in table2_service.items(): | 
					
						
							|  |  |  |         parameters.table2_fqn = parameters.table2_fqn.replace( | 
					
						
							|  |  |  |             k, v.fullyQualifiedName.root | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     parameters.test_case_defintion.parameterValues.extend( | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             TestCaseParameterValue( | 
					
						
							|  |  |  |                 name="table2", | 
					
						
							|  |  |  |                 value=parameters.table2_fqn, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     workflow_config = { | 
					
						
							|  |  |  |         "source": { | 
					
						
							|  |  |  |             "type": TestSuiteConfigType.TestSuite.value, | 
					
						
							|  |  |  |             "serviceName": "MyTestSuite", | 
					
						
							|  |  |  |             "sourceConfig": { | 
					
						
							|  |  |  |                 "config": { | 
					
						
							|  |  |  |                     "type": TestSuiteConfigType.TestSuite.value, | 
					
						
							|  |  |  |                     "entityFullyQualifiedName": table1.fullyQualifiedName.root, | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "processor": { | 
					
						
							|  |  |  |             "type": "orm-test-runner", | 
					
						
							|  |  |  |             "config": {"testCases": [parameters.test_case_defintion.dict()]}, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "sink": sink_config, | 
					
						
							|  |  |  |         "workflowConfig": workflow_config, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |     run_workflow(TestSuiteWorkflow, workflow_config) | 
					
						
							|  |  |  |     test_case_entity = metadata.get_by_name( | 
					
						
							|  |  |  |         TestCase, | 
					
						
							|  |  |  |         f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}", | 
					
						
							|  |  |  |         fields=["*"], | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  |     assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result | 
					
						
							|  |  |  |     assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.mark.parametrize( | 
					
						
							|  |  |  |     "parameters,expected", | 
					
						
							|  |  |  |     [ | 
					
						
							|  |  |  |         pytest.param( | 
					
						
							|  |  |  |             TestCaseDefinition( | 
					
						
							|  |  |  |                 name="unsupported_dialect", | 
					
						
							|  |  |  |                 testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                 computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                 parameterValues=[ | 
					
						
							|  |  |  |                     TestCaseParameterValue( | 
					
						
							|  |  |  |                         name="service2Url", | 
					
						
							|  |  |  |                         value="mongodb://localhost:27017", | 
					
						
							|  |  |  |                     ), | 
					
						
							|  |  |  |                     TestCaseParameterValue( | 
					
						
							|  |  |  |                         name="table2", | 
					
						
							|  |  |  |                         value="POSTGRES_SERVICE.dvdrental.public.customer", | 
					
						
							|  |  |  |                     ), | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             TestCaseResult( | 
					
						
							|  |  |  |                 testCaseStatus=TestCaseStatus.Aborted, | 
					
						
							| 
									
										
										
										
											2024-07-02 12:36:03 +02:00
										 |  |  |                 result="Unsupported dialect in param table2.serviceUrl: mongodb", | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |             id="unsupported_dialect", | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         pytest.param( | 
					
						
							| 
									
										
										
										
											2024-07-02 12:36:03 +02:00
										 |  |  |             TestCaseDefinition( | 
					
						
							|  |  |  |                 name="unsupported_data_types", | 
					
						
							|  |  |  |                 testDefinitionName="tableDiff", | 
					
						
							|  |  |  |                 computePassedFailedRowCount=True, | 
					
						
							|  |  |  |                 parameterValues=[ | 
					
						
							|  |  |  |                     TestCaseParameterValue( | 
					
						
							|  |  |  |                         name="table2", | 
					
						
							|  |  |  |                         value="POSTGRES_SERVICE.dvdrental.public.customer_int_first_name", | 
					
						
							|  |  |  |                     ), | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             TestCaseResult( | 
					
						
							|  |  |  |                 testCaseStatus=TestCaseStatus.Failed, | 
					
						
							|  |  |  |                 result="Tables have 1 different columns:" | 
					
						
							|  |  |  |                 "\n  Changed columns:" | 
					
						
							|  |  |  |                 "\n    first_name: VARCHAR -> INT", | 
					
						
							|  |  |  |                 testResultValue=[ | 
					
						
							|  |  |  |                     TestResultValue(name="removedColumns", value="0"), | 
					
						
							|  |  |  |                     TestResultValue(name="addedColumns", value="0"), | 
					
						
							|  |  |  |                     TestResultValue(name="changedColumns", value="1"), | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |             ), | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |         ), | 
					
						
							|  |  |  |         pytest.param( | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             marks=pytest.mark.skip( | 
					
						
							|  |  |  |                 reason="TODO: implement test - table2 does not exist" | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         pytest.param( | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             marks=pytest.mark.skip( | 
					
						
							|  |  |  |                 reason="TODO: implement test - where clause is invalid" | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ], | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | def test_error_paths( | 
					
						
							|  |  |  |     parameters: TestCaseDefinition, | 
					
						
							|  |  |  |     expected: TestCaseResult, | 
					
						
							|  |  |  |     prepare_data: None, | 
					
						
							|  |  |  |     ingest_postgres_metadata, | 
					
						
							|  |  |  |     ingest_mysql_service: DatabaseService, | 
					
						
							|  |  |  |     postgres_service: DatabaseService, | 
					
						
							|  |  |  |     patched_metadata: OpenMetadata, | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     sink_config, | 
					
						
							|  |  |  |     workflow_config, | 
					
						
							|  |  |  |     run_workflow, | 
					
						
							|  |  |  |     cleanup_fqns, | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | ): | 
					
						
							|  |  |  |     metadata = patched_metadata | 
					
						
							|  |  |  |     table1 = metadata.get_by_name( | 
					
						
							|  |  |  |         Table, | 
					
						
							|  |  |  |         f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer", | 
					
						
							|  |  |  |         nullable=False, | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     cleanup_fqns(TestCase, f"{table1.fullyQualifiedName.root}.{parameters.name}") | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |     for parameter in parameters.parameterValues: | 
					
						
							|  |  |  |         if parameter.name == "table2": | 
					
						
							|  |  |  |             parameter.value = parameter.value.replace( | 
					
						
							|  |  |  |                 "POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     workflow_config = { | 
					
						
							|  |  |  |         "source": { | 
					
						
							|  |  |  |             "type": TestSuiteConfigType.TestSuite.value, | 
					
						
							|  |  |  |             "serviceName": "MyTestSuite", | 
					
						
							|  |  |  |             "sourceConfig": { | 
					
						
							|  |  |  |                 "config": { | 
					
						
							|  |  |  |                     "type": TestSuiteConfigType.TestSuite.value, | 
					
						
							|  |  |  |                     "entityFullyQualifiedName": table1.fullyQualifiedName.root, | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "processor": { | 
					
						
							|  |  |  |             "type": "orm-test-runner", | 
					
						
							|  |  |  |             "config": {"testCases": [parameters.dict()]}, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "sink": sink_config, | 
					
						
							|  |  |  |         "workflowConfig": workflow_config, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |     run_workflow(TestSuiteWorkflow, workflow_config) | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |     test_case_entity: TestCase = metadata.get_or_create_test_case( | 
					
						
							|  |  |  |         f"{table1.fullyQualifiedName.root}.{parameters.name}" | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def add_changed_tables(connection: Connection): | 
					
						
							|  |  |  |     connection.execute("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;") | 
					
						
							|  |  |  |     connection.execute("CREATE TABLE changed_customer AS SELECT * FROM customer;") | 
					
						
							|  |  |  |     connection.execute( | 
					
						
							|  |  |  |         "UPDATE changed_customer SET first_name = 'John' WHERE MOD(customer_id, 2) = 0;" | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     connection.execute("DELETE FROM changed_customer WHERE MOD(customer_id, 13) = 0;") | 
					
						
							|  |  |  |     connection.execute( | 
					
						
							|  |  |  |         "CREATE TABLE customer_without_first_name AS SELECT * FROM customer;" | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     connection.execute( | 
					
						
							|  |  |  |         "ALTER TABLE customer_without_first_name DROP COLUMN first_name;" | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-07-02 12:36:03 +02:00
										 |  |  |     connection.execute( | 
					
						
							|  |  |  |         "CREATE TABLE customer_int_first_name AS SELECT * FROM customer;" | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     connection.execute("ALTER TABLE customer_int_first_name DROP COLUMN first_name;") | 
					
						
							|  |  |  |     connection.execute("ALTER TABLE customer_int_first_name ADD COLUMN first_name INT;") | 
					
						
							|  |  |  |     connection.execute("UPDATE customer_int_first_name SET first_name = 1;") | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture(scope="module") | 
					
						
							|  |  |  | def prepare_data(postgres_container, mysql_container): | 
					
						
							|  |  |  |     postgres_engine = create_engine( | 
					
						
							|  |  |  |         make_url(postgres_container.get_connection_url()).set(database="dvdrental") | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     with postgres_engine.connect() as conn: | 
					
						
							|  |  |  |         add_changed_tables(conn) | 
					
						
							|  |  |  |     mysql_container = create_engine( | 
					
						
							|  |  |  |         make_url(mysql_container.get_connection_url()).set( | 
					
						
							|  |  |  |             database=mysql_container.dbname | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     postgres_engine = create_engine( | 
					
						
							|  |  |  |         make_url(postgres_container.get_connection_url()).set(database="dvdrental") | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     copy_table(postgres_engine, mysql_container, "customer") | 
					
						
							|  |  |  |     copy_table(postgres_engine, mysql_container, "changed_customer") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def copy_table(source_engine, destination_engine, table_name): | 
					
						
							|  |  |  |     source_metadata = MetaData() | 
					
						
							|  |  |  |     source_table = SQATable(table_name, source_metadata, autoload_with=source_engine) | 
					
						
							|  |  |  |     destination_metadata = MetaData() | 
					
						
							|  |  |  |     destination_table = SQATable(table_name, destination_metadata) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for column in source_table.columns: | 
					
						
							|  |  |  |         # we copy all the columns without constraints, indexes or defaults | 
					
						
							|  |  |  |         # as we are only interested in the data | 
					
						
							|  |  |  |         if ( | 
					
						
							|  |  |  |             isinstance(column.type, postgresql.base.BYTEA) | 
					
						
							|  |  |  |             and destination_engine.dialect.name == "mssql" | 
					
						
							|  |  |  |         ): | 
					
						
							|  |  |  |             column_copy = SQAColumn(column.name, VARBINARY) | 
					
						
							|  |  |  |         elif ( | 
					
						
							|  |  |  |             isinstance(column.type, sqltypes.BOOLEAN) | 
					
						
							|  |  |  |             and destination_engine.dialect.name == "mssql" | 
					
						
							|  |  |  |         ): | 
					
						
							|  |  |  |             column_copy = SQAColumn(column.name, sqltypes.Boolean) | 
					
						
							|  |  |  |         elif ( | 
					
						
							|  |  |  |             isinstance(column.type, sqltypes.TIMESTAMP) | 
					
						
							|  |  |  |             and destination_engine.dialect.name == "mssql" | 
					
						
							|  |  |  |         ): | 
					
						
							|  |  |  |             column_copy = SQAColumn(column.name, sqltypes.DateTime) | 
					
						
							|  |  |  |         elif ( | 
					
						
							|  |  |  |             isinstance(column.type, sqltypes.DATE) | 
					
						
							|  |  |  |             and destination_engine.dialect.name == "mssql" | 
					
						
							|  |  |  |         ): | 
					
						
							|  |  |  |             column_copy = SQAColumn(column.name, sqltypes.DateTime) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             column_copy = SQAColumn(column.name, column.type) | 
					
						
							|  |  |  |         destination_table.append_column(column_copy) | 
					
						
							|  |  |  |     destination_metadata.create_all(destination_engine) | 
					
						
							|  |  |  |     with source_engine.connect() as source_connection, destination_engine.connect() as destination_connection: | 
					
						
							|  |  |  |         data = source_connection.execute(source_table.select()).fetchall() | 
					
						
							|  |  |  |         batch_size = 1000 | 
					
						
							|  |  |  |         for i in range(0, len(data), batch_size): | 
					
						
							|  |  |  |             batch = data[i : i + batch_size] | 
					
						
							|  |  |  |             destination_connection.execute( | 
					
						
							|  |  |  |                 source_table.insert(), [dict(row) for row in batch] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture | 
					
						
							|  |  |  | def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch): | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     dbs_by_name = { | 
					
						
							|  |  |  |         service.fullyQualifiedName.root: service | 
					
						
							|  |  |  |         for service in [postgres_service, ingest_mysql_service] | 
					
						
							|  |  |  |     } | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |     def override_result_by_fqn(func): | 
					
						
							|  |  |  |         def inner(*args, **kwargs): | 
					
						
							|  |  |  |             result = func(*args, **kwargs) | 
					
						
							|  |  |  |             if result.fullyQualifiedName.root in dbs_by_name: | 
					
						
							|  |  |  |                 return dbs_by_name[result.fullyQualifiedName.root] | 
					
						
							|  |  |  |             return result | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |         return inner | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     monkeypatch.setattr( | 
					
						
							|  |  |  |         "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name", | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |         override_result_by_fqn(OpenMetadata.get_by_name), | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     monkeypatch.setattr( | 
					
						
							|  |  |  |         "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", | 
					
						
							| 
									
										
										
										
											2024-07-19 12:12:34 +02:00
										 |  |  |         override_result_by_fqn(OpenMetadata.get_by_id), | 
					
						
							| 
									
										
										
										
											2024-06-20 16:54:12 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return metadata |