| 
									
										
										
										
											2021-12-01 12:46:28 +05:30
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  | import importlib | 
					
						
							|  |  |  | import pathlib | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  | import uuid | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  | from unittest import TestCase | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-07 06:17:48 +01:00
										 |  |  | from metadata.config.common import ConfigurationError, load_config_file | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | 
					
						
							|  |  |  |     OpenMetadataConnection, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import DatabaseService | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.status import ( | 
					
						
							|  |  |  |     StepSummary, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | 
					
						
							|  |  |  |     OpenMetadataJWTClientConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2021-11-03 21:02:34 +01:00
										 |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  | from metadata.workflow.metadata import MetadataWorkflow | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class WorkflowTest(TestCase): | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     Validate workflow methods | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     server_config = OpenMetadataConnection( | 
					
						
							|  |  |  |         hostPort="http://localhost:8585/api", | 
					
						
							|  |  |  |         authProvider="openmetadata", | 
					
						
							|  |  |  |         securityConfig=OpenMetadataJWTClientConfig( | 
					
						
							|  |  |  |             jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     metadata = OpenMetadata(server_config) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert metadata.health_check() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def delete_service(self): | 
					
						
							|  |  |  |         service_id = str( | 
					
						
							|  |  |  |             self.metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=DatabaseService, fqn="local_mysql_test" | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             ).id.root | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.metadata.delete( | 
					
						
							|  |  |  |             entity=DatabaseService, | 
					
						
							|  |  |  |             entity_id=service_id, | 
					
						
							|  |  |  |             recursive=True, | 
					
						
							|  |  |  |             hard_delete=True, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |     def test_get_200(self): | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         key = "metadata.ingestion.sink.metadata_rest.MetadataRestSink" | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         if key.find(".") >= 0: | 
					
						
							|  |  |  |             module_name, class_name = key.rsplit(".", 1) | 
					
						
							|  |  |  |             my_class = getattr(importlib.import_module(module_name), class_name) | 
					
						
							|  |  |  |             self.assertEqual((my_class is not None), True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_4xx(self): | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         key = "metadata.ingestion.sink.MYSQL.mysqlSINK" | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         try: | 
					
						
							|  |  |  |             if key.find(".") >= 0: | 
					
						
							|  |  |  |                 module_name, class_name = key.rsplit(".", 1) | 
					
						
							| 
									
										
										
										
											2022-05-25 11:10:22 +05:30
										 |  |  |                 getattr(importlib.import_module(module_name), class_name) | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         except ModuleNotFoundError: | 
					
						
							|  |  |  |             self.assertRaises(ModuleNotFoundError) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_title_typeClassFetch(self): | 
					
						
							|  |  |  |         is_file = True | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         file_type = "query-parser" | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         if is_file: | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |             replace = file_type.replace("-", "_") | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |             replace = "".join( | 
					
						
							|  |  |  |                 [i.title() for i in file_type.replace("-", "_").split("_")] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         self.assertEqual(replace, "query_parser") | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_title_typeClassFetch_4xx(self): | 
					
						
							|  |  |  |         is_file = False | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         file_type = "query-parser" | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         if is_file: | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |             replace = file_type.replace("-", "_") | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |             replace = "".join( | 
					
						
							|  |  |  |                 [i.title() for i in file_type.replace("-", "_").split("_")] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         self.assertEqual(replace, "QueryParser") | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_execute_200(self): | 
					
						
							| 
									
										
										
										
											2021-11-07 11:19:06 -08:00
										 |  |  |         current_dir = pathlib.Path(__file__).resolve().parent | 
					
						
							| 
									
										
										
										
											2022-05-25 11:10:22 +05:30
										 |  |  |         config_file = current_dir.joinpath("mysql_test.yaml") | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         workflow_config = load_config_file(config_file) | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |         workflow = MetadataWorkflow.create(workflow_config) | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         workflow.execute() | 
					
						
							|  |  |  |         workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  |         # Service is created | 
					
						
							|  |  |  |         self.assertIsNotNone( | 
					
						
							|  |  |  |             self.metadata.get_by_name(entity=DatabaseService, fqn="local_mysql_test") | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # The service has an ingestion pipeline (since it has the ingestionPipelineFQN inside and the runId) | 
					
						
							| 
									
										
										
										
											2023-03-06 14:56:29 +01:00
										 |  |  |         self.assertIsNotNone( | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  |             self.metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=IngestionPipeline, fqn=workflow_config["ingestionPipelineFQN"] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # The pipeline has the right status | 
					
						
							|  |  |  |         pipeline_status = self.metadata.get_pipeline_status( | 
					
						
							|  |  |  |             workflow_config["ingestionPipelineFQN"], workflow_config["pipelineRunId"] | 
					
						
							| 
									
										
										
										
											2023-03-06 14:56:29 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  |         # We have status for the source and sink | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |         self.assertEqual(len(pipeline_status.status.root), 2) | 
					
						
							|  |  |  |         self.assertTrue(isinstance(pipeline_status.status.root[0], StepSummary)) | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         # Rerunning with a different Run ID still generates the correct status | 
					
						
							|  |  |  |         new_run_id = str(uuid.uuid4()) | 
					
						
							|  |  |  |         workflow_config["pipelineRunId"] = new_run_id | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow = MetadataWorkflow.create(workflow_config) | 
					
						
							|  |  |  |         workflow.execute() | 
					
						
							|  |  |  |         workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pipeline_status = self.metadata.get_pipeline_status( | 
					
						
							|  |  |  |             workflow_config["ingestionPipelineFQN"], new_run_id | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  |         # We have status for the source and sink | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |         self.assertEqual(len(pipeline_status.status.root), 2) | 
					
						
							|  |  |  |         self.assertTrue(isinstance(pipeline_status.status.root[0], StepSummary)) | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self.delete_service() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |     def test_execute_4xx(self): | 
					
						
							| 
									
										
										
										
											2021-11-07 11:19:06 -08:00
										 |  |  |         config_file = pathlib.Path("/tmp/mysql_test123") | 
					
						
							| 
									
										
										
										
											2021-09-12 01:07:13 +05:30
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2022-05-25 11:10:22 +05:30
										 |  |  |             load_config_file(config_file) | 
					
						
							| 
									
										
										
										
											2021-11-07 11:19:06 -08:00
										 |  |  |         except ConfigurationError: | 
					
						
							|  |  |  |             self.assertRaises(ConfigurationError) | 
					
						
							| 
									
										
										
										
											2023-03-06 14:56:29 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-06 16:32:10 +01:00
										 |  |  |     def test_fail_no_service_connection_and_overwrite(self): | 
					
						
							|  |  |  |         current_dir = pathlib.Path(__file__).resolve().parent | 
					
						
							|  |  |  |         config_file = current_dir.joinpath("mysql_test.yaml") | 
					
						
							|  |  |  |         workflow_config = load_config_file(config_file) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         del workflow_config["source"]["serviceConnection"] | 
					
						
							|  |  |  |         workflow_config["workflowConfig"]["openMetadataServerConfig"][ | 
					
						
							|  |  |  |             "forceEntityOverwriting" | 
					
						
							|  |  |  |         ] = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with self.assertRaises(AttributeError): | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |             MetadataWorkflow.create(workflow_config) |