| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | #  Copyright 2022 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Test database connectors which extend from `CommonDbSourceService` with CLI | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2023-04-21 13:38:27 +05:30
										 |  |  | import json | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | from abc import ABC, abstractmethod | 
					
						
							|  |  |  | from pathlib import Path | 
					
						
							| 
									
										
										
										
											2023-04-21 08:55:54 +02:00
										 |  |  | from typing import Optional | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | from sqlalchemy.engine import Engine | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  | from metadata.ingestion.api.status import Status | 
					
						
							|  |  |  | from metadata.workflow.metadata import MetadataWorkflow | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-14 15:58:31 +02:00
										 |  |  | from ..base.test_cli import PATH_TO_RESOURCES | 
					
						
							|  |  |  | from ..base.test_cli_db import CliDBBase | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CliCommonDB: | 
					
						
							|  |  |  |     class TestSuite(CliDBBase.TestSuite, ABC): | 
					
						
							|  |  |  |         engine: Engine | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @classmethod | 
					
						
							|  |  |  |         def setUpClass(cls) -> None: | 
					
						
							|  |  |  |             connector = cls.get_connector_name() | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             workflow: MetadataWorkflow = cls.get_workflow( | 
					
						
							|  |  |  |                 connector, cls.get_test_type() | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |             cls.engine = workflow.source.engine | 
					
						
							|  |  |  |             cls.openmetadata = workflow.source.metadata | 
					
						
							|  |  |  |             cls.config_file_path = str( | 
					
						
							|  |  |  |                 Path(PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml") | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             cls.test_file_path = str( | 
					
						
							|  |  |  |                 Path(PATH_TO_RESOURCES + f"/database/{connector}/test.yaml") | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def tearDown(self) -> None: | 
					
						
							|  |  |  |             self.engine.dispose() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def assert_for_vanilla_ingestion( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ) -> None: | 
					
						
							|  |  |  |             self.assertTrue(len(source_status.failures) == 0) | 
					
						
							|  |  |  |             self.assertTrue(len(source_status.warnings) == 0) | 
					
						
							|  |  |  |             self.assertTrue(len(source_status.filtered) == 0) | 
					
						
							| 
									
										
										
										
											2023-03-24 17:59:06 +01:00
										 |  |  |             self.assertTrue(len(source_status.records) >= self.expected_tables()) | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |             self.assertTrue(len(sink_status.failures) == 0) | 
					
						
							|  |  |  |             self.assertTrue(len(sink_status.warnings) == 0) | 
					
						
							|  |  |  |             self.assertTrue(len(sink_status.records) > self.expected_tables()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def assert_for_table_with_profiler( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ): | 
					
						
							|  |  |  |             self.assertTrue(len(source_status.failures) == 0) | 
					
						
							| 
									
										
										
										
											2023-04-24 08:00:25 +02:00
										 |  |  |             self.assertTrue( | 
					
						
							|  |  |  |                 len(source_status.records) >= self.expected_profiled_tables() | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |             self.assertTrue(len(sink_status.failures) == 0) | 
					
						
							| 
									
										
										
										
											2023-04-24 08:00:25 +02:00
										 |  |  |             self.assertTrue(len(sink_status.records) >= self.expected_profiled_tables()) | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |             sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData | 
					
						
							|  |  |  |             lineage = self.retrieve_lineage(self.fqn_created_table()) | 
					
						
							| 
									
										
										
										
											2023-01-27 10:22:12 +01:00
										 |  |  |             self.assertTrue(len(sample_data.rows) == self.inserted_rows_count()) | 
					
						
							| 
									
										
										
										
											2023-04-21 17:45:12 +02:00
										 |  |  |             if self.view_column_lineage_count() is not None: | 
					
						
							|  |  |  |                 self.assertTrue( | 
					
						
							|  |  |  |                     len( | 
					
						
							|  |  |  |                         lineage["downstreamEdges"][0]["lineageDetails"][ | 
					
						
							|  |  |  |                             "columnsLineage" | 
					
						
							|  |  |  |                         ] | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                     == self.view_column_lineage_count() | 
					
						
							|  |  |  |                 ) | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-18 11:56:16 +02:00
										 |  |  |         def assert_for_table_with_profiler_time_partition( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2023-04-18 11:56:16 +02:00
										 |  |  |         ): | 
					
						
							|  |  |  |             self.assertTrue(len(source_status.failures) == 0) | 
					
						
							|  |  |  |             self.assertTrue(len(sink_status.failures) == 0) | 
					
						
							|  |  |  |             sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData | 
					
						
							| 
									
										
										
										
											2023-05-30 17:15:55 +02:00
										 |  |  |             self.assertTrue(len(sample_data.rows) <= self.inserted_rows_count()) | 
					
						
							| 
									
										
										
										
											2023-04-18 11:56:16 +02:00
										 |  |  |             profile = self.retrieve_profile(self.fqn_created_table()) | 
					
						
							|  |  |  |             expected_profiler_time_partition_results = ( | 
					
						
							|  |  |  |                 self.get_profiler_time_partition_results() | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             if expected_profiler_time_partition_results: | 
					
						
							|  |  |  |                 table_profile = profile.profile.dict() | 
					
						
							|  |  |  |                 for key in expected_profiler_time_partition_results["table_profile"]: | 
					
						
							|  |  |  |                     self.assertTrue( | 
					
						
							|  |  |  |                         table_profile[key] | 
					
						
							|  |  |  |                         == expected_profiler_time_partition_results["table_profile"][ | 
					
						
							|  |  |  |                             key | 
					
						
							|  |  |  |                         ] | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 for column in profile.columns: | 
					
						
							|  |  |  |                     expected_column_profile = next( | 
					
						
							|  |  |  |                         ( | 
					
						
							|  |  |  |                             profile.get(column.name.__root__) | 
					
						
							|  |  |  |                             for profile in expected_profiler_time_partition_results[ | 
					
						
							|  |  |  |                                 "column_profile" | 
					
						
							|  |  |  |                             ] | 
					
						
							|  |  |  |                             if profile.get(column.name.__root__) | 
					
						
							|  |  |  |                         ), | 
					
						
							|  |  |  |                         None, | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                     if expected_column_profile: | 
					
						
							|  |  |  |                         column_profile = column.profile.dict() | 
					
						
							|  |  |  |                         for key in expected_column_profile:  # type: ignore | 
					
						
							|  |  |  |                             self.assertTrue( | 
					
						
							|  |  |  |                                 column_profile[key] == expected_column_profile[key] | 
					
						
							|  |  |  |                             ) | 
					
						
							| 
									
										
										
										
											2023-04-21 13:38:27 +05:30
										 |  |  |                 if sample_data: | 
					
						
							| 
									
										
										
										
											2023-05-22 09:04:18 +02:00
										 |  |  |                     self.assertTrue(len(json.loads(sample_data.json()).get("rows")) > 0) | 
					
						
							| 
									
										
										
										
											2023-04-18 11:56:16 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         def assert_for_delete_table_is_marked_as_deleted( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ): | 
					
						
							| 
									
										
										
										
											2023-04-21 08:55:54 +02:00
										 |  |  |             self.assertEqual(self.retrieve_table(self.fqn_deleted_table()), None) | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         def assert_filtered_schemas_includes( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ): | 
					
						
							|  |  |  |             self.assertTrue((len(source_status.failures) == 0)) | 
					
						
							|  |  |  |             self.assertTrue( | 
					
						
							|  |  |  |                 ( | 
					
						
							|  |  |  |                     len(source_status.filtered) | 
					
						
							|  |  |  |                     == self.expected_filtered_schema_includes() | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def assert_filtered_schemas_excludes( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ): | 
					
						
							|  |  |  |             self.assertTrue((len(source_status.failures) == 0)) | 
					
						
							|  |  |  |             self.assertTrue( | 
					
						
							|  |  |  |                 ( | 
					
						
							|  |  |  |                     len(source_status.filtered) | 
					
						
							|  |  |  |                     == self.expected_filtered_schema_excludes() | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def assert_filtered_tables_includes( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ): | 
					
						
							|  |  |  |             self.assertTrue((len(source_status.failures) == 0)) | 
					
						
							|  |  |  |             self.assertTrue( | 
					
						
							|  |  |  |                 (len(source_status.filtered) == self.expected_filtered_table_includes()) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def assert_filtered_tables_excludes( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             self, source_status: Status, sink_status: Status | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         ): | 
					
						
							|  |  |  |             self.assertTrue((len(source_status.failures) == 0)) | 
					
						
							|  |  |  |             self.assertTrue( | 
					
						
							|  |  |  |                 (len(source_status.filtered) == self.expected_filtered_table_excludes()) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |         def assert_filtered_mix(self, source_status: Status, sink_status: Status): | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |             self.assertTrue((len(source_status.failures) == 0)) | 
					
						
							|  |  |  |             self.assertTrue( | 
					
						
							|  |  |  |                 (len(source_status.filtered) == self.expected_filtered_mix()) | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def expected_tables() -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def inserted_rows_count(self) -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-01-27 15:33:03 +01:00
										 |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def view_column_lineage_count(self) -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def fqn_created_table() -> str: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-21 08:55:54 +02:00
										 |  |  |         @staticmethod | 
					
						
							|  |  |  |         def _fqn_deleted_table() -> Optional[str]: | 
					
						
							|  |  |  |             return None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-24 08:00:25 +02:00
										 |  |  |         @staticmethod | 
					
						
							|  |  |  |         def _expected_profiled_tables() -> int: | 
					
						
							|  |  |  |             return None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-21 08:55:54 +02:00
										 |  |  |         def fqn_deleted_table(self) -> str: | 
					
						
							|  |  |  |             if self._fqn_deleted_table() is None: | 
					
						
							|  |  |  |                 return self.fqn_created_table() | 
					
						
							|  |  |  |             return self._fqn_deleted_table()  # type: ignore | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-24 08:00:25 +02:00
										 |  |  |         def expected_profiled_tables(self) -> int: | 
					
						
							|  |  |  |             if self._expected_profiled_tables() is None: | 
					
						
							|  |  |  |                 return self.expected_tables() | 
					
						
							|  |  |  |             return self._expected_profiled_tables() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-10 11:36:20 +02:00
										 |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def expected_filtered_schema_includes() -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def expected_filtered_schema_excludes() -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def expected_filtered_table_includes() -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def expected_filtered_table_excludes() -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         @staticmethod | 
					
						
							|  |  |  |         @abstractmethod | 
					
						
							|  |  |  |         def expected_filtered_mix() -> int: | 
					
						
							|  |  |  |             raise NotImplementedError() |