| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Validate metadata ingestion workflow generation | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | import json | 
					
						
							|  |  |  | import uuid | 
					
						
							|  |  |  | from unittest import TestCase | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  | from unittest.mock import patch | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.application import ( | 
					
						
							|  |  |  |     build_application_workflow_config, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-08-12 09:54:58 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.lineage import ( | 
					
						
							|  |  |  |     build_lineage_workflow_config, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.metadata import ( | 
					
						
							|  |  |  |     build_metadata_workflow_config, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from openmetadata_managed_apis.workflows.ingestion.profiler import ( | 
					
						
							|  |  |  |     build_profiler_workflow_config, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.test_suite import ( | 
					
						
							|  |  |  |     build_test_suite_workflow_config, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.usage import ( | 
					
						
							|  |  |  |     build_usage_workflow_config, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-11-28 06:25:33 +05:30
										 |  |  | from metadata.generated.schema.entity.applications.configuration.applicationConfig import ( | 
					
						
							|  |  |  |     AppConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  | from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import ( | 
					
						
							|  |  |  |     AutoTaggerAppConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | 
					
						
							|  |  |  |     OpenMetadataConnection, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import DatabaseService | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     AirflowConfig, | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							|  |  |  |     PipelineType, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  | from metadata.generated.schema.metadataIngestion.applicationPipeline import ( | 
					
						
							| 
									
										
										
										
											2024-01-08 14:57:07 +01:00
										 |  |  |     ApplicationConfigType, | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  |     ApplicationPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  | from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( | 
					
						
							|  |  |  |     DatabaseServiceMetadataPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( | 
					
						
							|  |  |  |     DatabaseServiceProfilerPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-08-12 09:54:58 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( | 
					
						
							|  |  |  |     DatabaseServiceQueryLineagePipeline, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  | from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import ( | 
					
						
							|  |  |  |     DatabaseServiceQueryUsagePipeline, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( | 
					
						
							|  |  |  |     TestSuitePipeline, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							|  |  |  |     Source as WorkflowSource, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  | from metadata.generated.schema.metadataIngestion.workflow import SourceConfig | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | 
					
						
							|  |  |  |     OpenMetadataJWTClientConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | from metadata.generated.schema.type.entityReference import EntityReference | 
					
						
							| 
									
										
										
										
											2023-03-06 14:44:16 +01:00
										 |  |  | from metadata.ingestion.api.parser import parse_workflow_config_gracefully | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | from metadata.ingestion.models.encoders import show_secrets_encoder | 
					
						
							|  |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  | from metadata.workflow.application import ApplicationWorkflow | 
					
						
							| 
									
										
										
										
											2023-10-06 18:29:18 +02:00
										 |  |  | from metadata.workflow.data_quality import TestSuiteWorkflow | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  | from metadata.workflow.metadata import MetadataWorkflow | 
					
						
							| 
									
										
										
										
											2023-09-04 11:02:57 +02:00
										 |  |  | from metadata.workflow.profiler import ProfilerWorkflow | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  | def mock_set_ingestion_pipeline_status(self, state): | 
					
						
							|  |  |  |     return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | class OMetaServiceTest(TestCase): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Run this integration test with the local API available | 
					
						
							|  |  |  |     Install the ingestion package before running the tests | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     service_entity_id = None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  |     server_config = OpenMetadataConnection( | 
					
						
							|  |  |  |         hostPort="http://localhost:8585/api", | 
					
						
							|  |  |  |         authProvider="openmetadata", | 
					
						
							|  |  |  |         securityConfig=OpenMetadataJWTClientConfig( | 
					
						
							|  |  |  |             jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     metadata = OpenMetadata(server_config) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert metadata.health_check() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     data = { | 
					
						
							|  |  |  |         "type": "mysql", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |         "serviceName": "test-workflow-mysql", | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |         "serviceConnection": { | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "type": "Mysql", | 
					
						
							|  |  |  |                 "username": "openmetadata_user", | 
					
						
							| 
									
										
										
										
											2023-06-16 13:18:12 +05:30
										 |  |  |                 "authType": {"password": "openmetadata_password"}, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |                 "hostPort": "localhost:3306", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         }, | 
					
						
							| 
									
										
										
										
											2022-06-08 16:10:40 +02:00
										 |  |  |         "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-25 11:55:12 +02:00
										 |  |  |     usage_data = { | 
					
						
							|  |  |  |         "type": "snowflake", | 
					
						
							|  |  |  |         "serviceName": "local_snowflake", | 
					
						
							|  |  |  |         "serviceConnection": { | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "type": "Snowflake", | 
					
						
							|  |  |  |                 "username": "openmetadata_user", | 
					
						
							|  |  |  |                 "password": "random", | 
					
						
							|  |  |  |                 "warehouse": "warehouse", | 
					
						
							|  |  |  |                 "account": "account", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         }, | 
					
						
							| 
									
										
										
										
											2022-08-12 09:54:58 +02:00
										 |  |  |         "sourceConfig": {"config": {"type": "DatabaseUsage", "queryLogDuration": 10}}, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     lineage_data = { | 
					
						
							|  |  |  |         "type": "snowflake", | 
					
						
							|  |  |  |         "serviceName": "local_snowflake", | 
					
						
							|  |  |  |         "serviceConnection": { | 
					
						
							|  |  |  |             "config": { | 
					
						
							|  |  |  |                 "type": "Snowflake", | 
					
						
							|  |  |  |                 "username": "openmetadata_user", | 
					
						
							|  |  |  |                 "password": "random", | 
					
						
							|  |  |  |                 "warehouse": "warehouse", | 
					
						
							|  |  |  |                 "account": "account", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "sourceConfig": {"config": {"type": "DatabaseLineage", "queryLogDuration": 10}}, | 
					
						
							| 
									
										
										
										
											2022-04-25 11:55:12 +02:00
										 |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     workflow_source = WorkflowSource(**data) | 
					
						
							| 
									
										
										
										
											2022-04-25 11:55:12 +02:00
										 |  |  |     usage_workflow_source = WorkflowSource(**usage_data) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def setUpClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Prepare ingredients. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Mock a db service to build the IngestionPipeline | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |         cls.service: DatabaseService = cls.metadata.get_service_or_create( | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             entity=DatabaseService, config=cls.workflow_source | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  | 
 | 
					
						
							|  |  |  |         cls.usage_service: DatabaseService = cls.metadata.get_service_or_create( | 
					
						
							|  |  |  |             entity=DatabaseService, | 
					
						
							|  |  |  |             config=cls.usage_workflow_source, | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def tearDownClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Clean up | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         cls.metadata.delete( | 
					
						
							|  |  |  |             entity=DatabaseService, | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |             entity_id=cls.service.id, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             recursive=True, | 
					
						
							|  |  |  |             hard_delete=True, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     @patch.object( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |         MetadataWorkflow, | 
					
						
							|  |  |  |         "set_ingestion_pipeline_status", | 
					
						
							|  |  |  |         mock_set_ingestion_pipeline_status, | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     def test_ingestion_workflow(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the ingestionPipeline can be parsed | 
					
						
							|  |  |  |         and properly load a Workflow | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             name="test_ingestion_workflow", | 
					
						
							|  |  |  |             pipelineType=PipelineType.metadata, | 
					
						
							|  |  |  |             fullyQualifiedName="local_mysql.test_ingestion_workflow", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |             sourceConfig=SourceConfig(config=DatabaseServiceMetadataPipeline()), | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             openMetadataServerConnection=self.server_config, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 startDate="2022-06-10T15:06:47+00:00", | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 id=self.service.id, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |                 type="databaseService", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 name=self.service.name.__root__, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow_config = build_metadata_workflow_config(ingestion_pipeline) | 
					
						
							|  |  |  |         config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-06 14:44:16 +01:00
										 |  |  |         parse_workflow_config_gracefully(config) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     @patch.object( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |         MetadataWorkflow, | 
					
						
							|  |  |  |         "set_ingestion_pipeline_status", | 
					
						
							|  |  |  |         mock_set_ingestion_pipeline_status, | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     def test_usage_workflow(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the ingestionPipeline can be parsed | 
					
						
							|  |  |  |         and properly load a usage Workflow | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							| 
									
										
										
										
											2022-04-25 11:55:12 +02:00
										 |  |  |             name="test_usage_workflow", | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             pipelineType=PipelineType.usage, | 
					
						
							| 
									
										
										
										
											2022-04-25 11:55:12 +02:00
										 |  |  |             fullyQualifiedName="local_snowflake.test_usage_workflow", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |             sourceConfig=SourceConfig(config=DatabaseServiceQueryUsagePipeline()), | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             openMetadataServerConnection=self.server_config, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 startDate="2022-06-10T15:06:47+00:00", | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 id=self.usage_service.id, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |                 type="databaseService", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 name=self.usage_service.name.__root__, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow_config = build_usage_workflow_config(ingestion_pipeline) | 
					
						
							| 
									
										
										
										
											2022-04-25 11:55:12 +02:00
										 |  |  |         self.assertIn("usage", workflow_config.source.type) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |         config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-06 14:44:16 +01:00
										 |  |  |         parse_workflow_config_gracefully(config) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     @patch.object( | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |         MetadataWorkflow, | 
					
						
							|  |  |  |         "set_ingestion_pipeline_status", | 
					
						
							|  |  |  |         mock_set_ingestion_pipeline_status, | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-08-12 09:54:58 +02:00
										 |  |  |     def test_lineage_workflow(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the ingestionPipeline can be parsed | 
					
						
							|  |  |  |         and properly load a lineage Workflow | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             name="test_lineage_workflow", | 
					
						
							|  |  |  |             pipelineType=PipelineType.lineage, | 
					
						
							|  |  |  |             fullyQualifiedName="local_snowflake.test_lineage_workflow", | 
					
						
							|  |  |  |             sourceConfig=SourceConfig(config=DatabaseServiceQueryLineagePipeline()), | 
					
						
							|  |  |  |             openMetadataServerConnection=self.server_config, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig( | 
					
						
							|  |  |  |                 startDate="2022-06-10T15:06:47+00:00", | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							|  |  |  |                 id=self.usage_service.id, | 
					
						
							|  |  |  |                 type="databaseService", | 
					
						
							|  |  |  |                 name=self.usage_service.name.__root__, | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow_config = build_lineage_workflow_config(ingestion_pipeline) | 
					
						
							|  |  |  |         self.assertIn("lineage", workflow_config.source.type) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-06 14:44:16 +01:00
										 |  |  |         parse_workflow_config_gracefully(config) | 
					
						
							| 
									
										
										
										
											2022-08-12 09:54:58 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     @patch.object( | 
					
						
							|  |  |  |         ProfilerWorkflow, | 
					
						
							|  |  |  |         "set_ingestion_pipeline_status", | 
					
						
							|  |  |  |         mock_set_ingestion_pipeline_status, | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |     def test_profiler_workflow(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the ingestionPipeline can be parsed | 
					
						
							|  |  |  |         and properly load a Profiler Workflow | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             name="test_profiler_workflow", | 
					
						
							|  |  |  |             pipelineType=PipelineType.profiler, | 
					
						
							|  |  |  |             fullyQualifiedName="local_mysql.test_profiler_workflow", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |             sourceConfig=SourceConfig(config=DatabaseServiceProfilerPipeline()), | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             openMetadataServerConnection=self.server_config, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 startDate="2022-06-10T15:06:47+00:00", | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 id=self.service.id, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |                 type="databaseService", | 
					
						
							| 
									
										
										
										
											2022-06-15 00:06:46 +05:30
										 |  |  |                 name=self.service.name.__root__, | 
					
						
							| 
									
										
										
										
											2022-04-21 17:53:29 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow_config = build_profiler_workflow_config(ingestion_pipeline) | 
					
						
							|  |  |  |         config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-06 14:44:16 +01:00
										 |  |  |         parse_workflow_config_gracefully(config) | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |     @patch.object( | 
					
						
							|  |  |  |         TestSuiteWorkflow, | 
					
						
							|  |  |  |         "set_ingestion_pipeline_status", | 
					
						
							|  |  |  |         mock_set_ingestion_pipeline_status, | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  |     def test_test_suite_workflow(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the ingestionPipeline can be parsed | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  |         and properly load a Test Suite Workflow | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             name="test_test_suite_workflow", | 
					
						
							|  |  |  |             pipelineType=PipelineType.TestSuite, | 
					
						
							|  |  |  |             fullyQualifiedName="local_mysql.test_test_suite_workflow", | 
					
						
							| 
									
										
										
										
											2023-06-12 07:47:45 +02:00
										 |  |  |             sourceConfig=SourceConfig( | 
					
						
							|  |  |  |                 config=TestSuitePipeline( | 
					
						
							|  |  |  |                     type="TestSuite", | 
					
						
							| 
									
										
										
										
											2023-06-19 18:01:26 +02:00
										 |  |  |                     entityFullyQualifiedName=self.service.name.__root__, | 
					
						
							| 
									
										
										
										
											2023-06-12 07:47:45 +02:00
										 |  |  |                 ) | 
					
						
							|  |  |  |             ), | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  |             openMetadataServerConnection=self.server_config, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig( | 
					
						
							|  |  |  |                 startDate="2022-06-10T15:06:47+00:00", | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							| 
									
										
										
										
											2023-06-19 18:01:26 +02:00
										 |  |  |                 id=uuid.uuid4(), | 
					
						
							|  |  |  |                 type="testSuite", | 
					
						
							|  |  |  |                 name="test_test_suite_workflow", | 
					
						
							| 
									
										
										
										
											2022-08-25 10:01:28 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow_config = build_test_suite_workflow_config(ingestion_pipeline) | 
					
						
							|  |  |  |         config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-03-06 14:44:16 +01:00
										 |  |  |         parse_workflow_config_gracefully(config) | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @patch.object( | 
					
						
							|  |  |  |         ApplicationWorkflow, | 
					
						
							|  |  |  |         "set_ingestion_pipeline_status", | 
					
						
							|  |  |  |         mock_set_ingestion_pipeline_status, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     def test_application_workflow(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the ingestionPipeline can be parsed | 
					
						
							|  |  |  |         and properly load an Application Workflow | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             name="test_auto_tagger_application", | 
					
						
							|  |  |  |             pipelineType=PipelineType.application, | 
					
						
							|  |  |  |             fullyQualifiedName="OpenMetadata.test_auto_tagger_application", | 
					
						
							|  |  |  |             sourceConfig=SourceConfig( | 
					
						
							|  |  |  |                 config=ApplicationPipeline( | 
					
						
							| 
									
										
										
										
											2024-01-08 14:57:07 +01:00
										 |  |  |                     type=ApplicationConfigType.Application, | 
					
						
							| 
									
										
										
										
											2023-11-13 08:58:38 +01:00
										 |  |  |                     appConfig=AppConfig( | 
					
						
							|  |  |  |                         __root__=AutoTaggerAppConfig(confidenceLevel=80) | 
					
						
							|  |  |  |                     ), | 
					
						
							|  |  |  |                     sourcePythonClass="metadata.applications.auto_tagger.AutoTaggerApp", | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             openMetadataServerConnection=self.server_config, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig( | 
					
						
							|  |  |  |                 startDate="2022-06-10T15:06:47+00:00", | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							|  |  |  |                 id=uuid.uuid4(), | 
					
						
							|  |  |  |                 type="metadata", | 
					
						
							|  |  |  |                 name="OpenMetadata", | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         workflow_config = build_application_workflow_config(ingestion_pipeline) | 
					
						
							|  |  |  |         config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ApplicationWorkflow.create(config) |