| 
									
										
										
										
											2021-12-01 12:46:28 +05:30
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | """
 | 
					
						
							| 
									
										
										
										
											2021-10-28 20:31:38 +02:00
										 |  |  | OpenMetadata high-level API Pipeline test | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | """
 | 
					
						
							|  |  |  | import uuid | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  | from datetime import datetime | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  | from unittest import TestCase | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  | from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | from metadata.generated.schema.api.services.createPipelineService import ( | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     CreatePipelineServiceRequest, | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  | from metadata.generated.schema.api.teams.createUser import CreateUserRequest | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  | from metadata.generated.schema.entity.data.pipeline import ( | 
					
						
							|  |  |  |     Pipeline, | 
					
						
							|  |  |  |     PipelineStatus, | 
					
						
							|  |  |  |     StatusType, | 
					
						
							|  |  |  |     Task, | 
					
						
							|  |  |  |     TaskStatus, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-12 23:40:21 -07:00
										 |  |  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | 
					
						
							|  |  |  |     OpenMetadataConnection, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-05-25 08:35:16 +02:00
										 |  |  | from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( | 
					
						
							|  |  |  |     AirflowConnection, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( | 
					
						
							|  |  |  |     BackendConnection, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | from metadata.generated.schema.entity.services.pipelineService import ( | 
					
						
							| 
									
										
										
										
											2022-05-25 08:35:16 +02:00
										 |  |  |     PipelineConnection, | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     PipelineService, | 
					
						
							|  |  |  |     PipelineServiceType, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | 
					
						
							|  |  |  |     OpenMetadataJWTClientConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | from metadata.generated.schema.type.entityReference import EntityReference | 
					
						
							|  |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  | from metadata.utils.helpers import datetime_to_ts | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class OMetaPipelineTest(TestCase): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Run this integration test with the local API available | 
					
						
							|  |  |  |     Install the ingestion package before running the tests | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     service_entity_id = None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  |     server_config = OpenMetadataConnection( | 
					
						
							|  |  |  |         hostPort="http://localhost:8585/api", | 
					
						
							|  |  |  |         authProvider="openmetadata", | 
					
						
							|  |  |  |         securityConfig=OpenMetadataJWTClientConfig( | 
					
						
							|  |  |  |             jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     metadata = OpenMetadata(server_config) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-31 16:23:01 +01:00
										 |  |  |     assert metadata.health_check() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     user = metadata.create_or_update( | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |         data=CreateUserRequest(name="random-user", email="random@user.com"), | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  |     owner = EntityReference(id=user.id, type="user") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     service = CreatePipelineServiceRequest( | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         name="test-service-pipeline", | 
					
						
							|  |  |  |         serviceType=PipelineServiceType.Airflow, | 
					
						
							| 
									
										
										
										
											2022-05-25 08:35:16 +02:00
										 |  |  |         connection=PipelineConnection( | 
					
						
							|  |  |  |             config=AirflowConnection( | 
					
						
							|  |  |  |                 hostPort="http://localhost:8080", | 
					
						
							|  |  |  |                 connection=BackendConnection(), | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ), | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  |     service_type = "pipelineService" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def setUpClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Prepare ingredients | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         cls.service_entity = cls.metadata.create_or_update(data=cls.service) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.entity = Pipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             name="test", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             service=EntityReference(id=cls.service_entity.id, type="pipelineService"), | 
					
						
							| 
									
										
										
										
											2022-04-05 21:20:39 +02:00
										 |  |  |             fullyQualifiedName="test-service-pipeline.test", | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |         cls.create = CreatePipelineRequest( | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |             name="test", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             service=cls.service_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def tearDownClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Clean up | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         service_id = str( | 
					
						
							|  |  |  |             cls.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |                 entity=PipelineService, fqn="test-service-pipeline" | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |             ).id.__root__ | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-21 22:30:39 +05:30
										 |  |  |         cls.metadata.delete( | 
					
						
							| 
									
										
										
										
											2022-04-05 21:20:39 +02:00
										 |  |  |             entity=PipelineService, | 
					
						
							|  |  |  |             entity_id=service_id, | 
					
						
							|  |  |  |             recursive=True, | 
					
						
							|  |  |  |             hard_delete=True, | 
					
						
							| 
									
										
										
										
											2022-03-21 22:30:39 +05:30
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_create(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can create a Pipeline and we receive it back as Entity | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.name, self.entity.name) | 
					
						
							|  |  |  |         self.assertEqual(res.service.id, self.entity.service.id) | 
					
						
							|  |  |  |         self.assertEqual(res.owner, None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_update(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Updating it properly changes its properties | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res_create = self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         updated = self.create.dict(exclude_unset=True) | 
					
						
							|  |  |  |         updated["owner"] = self.owner | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |         updated_entity = CreatePipelineRequest(**updated) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         res = self.metadata.create_or_update(data=updated_entity) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Same ID, updated algorithm | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             res.service.fullyQualifiedName, updated_entity.service.__root__ | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         self.assertEqual(res_create.id, res.id) | 
					
						
							|  |  |  |         self.assertEqual(res.owner.id, self.user.id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_name(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can fetch a Pipeline by name and get it back as Entity | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = self.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             entity=Pipeline, fqn=self.entity.fullyQualifiedName | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  |         self.assertEqual(res.name, self.entity.name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_id(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can fetch a Pipeline by ID and get it back as Entity | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # First pick up by name | 
					
						
							|  |  |  |         res_name = self.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             entity=Pipeline, fqn=self.entity.fullyQualifiedName | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  |         # Then fetch by ID | 
					
						
							| 
									
										
										
										
											2021-12-06 08:40:53 +01:00
										 |  |  |         res = self.metadata.get_by_id(entity=Pipeline, entity_id=res_name.id) | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res_name.id, res.id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_list(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can list all our Pipelines | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = self.metadata.list_entities(entity=Pipeline, limit=100) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Fetch our test Database. We have already inserted it, so we should find it | 
					
						
							|  |  |  |         data = next( | 
					
						
							|  |  |  |             iter(ent for ent in res.entities if ent.name == self.entity.name), None | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert data | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_delete(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can delete a Pipeline by ID | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Find by name | 
					
						
							|  |  |  |         res_name = self.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             entity=Pipeline, fqn=self.entity.fullyQualifiedName | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  |         # Then fetch by ID | 
					
						
							|  |  |  |         res_id = self.metadata.get_by_id( | 
					
						
							|  |  |  |             entity=Pipeline, entity_id=str(res_name.id.__root__) | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Delete | 
					
						
							|  |  |  |         self.metadata.delete(entity=Pipeline, entity_id=str(res_id.id.__root__)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Then we should not find it | 
					
						
							|  |  |  |         res = self.metadata.list_entities(entity=Pipeline) | 
					
						
							|  |  |  |         assert not next( | 
					
						
							| 
									
										
										
										
											2021-10-28 20:31:38 +02:00
										 |  |  |             iter( | 
					
						
							|  |  |  |                 ent | 
					
						
							|  |  |  |                 for ent in res.entities | 
					
						
							|  |  |  |                 if ent.fullyQualifiedName == self.entity.fullyQualifiedName | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             None, | 
					
						
							| 
									
										
										
										
											2021-10-26 23:18:43 +02:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-01-07 10:37:56 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |     def test_add_status(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can add status data | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         create_pipeline = CreatePipelineRequest( | 
					
						
							|  |  |  |             name="pipeline-test", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             service=self.service_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |             tasks=[ | 
					
						
							|  |  |  |                 Task(name="task1"), | 
					
						
							|  |  |  |                 Task(name="task2"), | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |             ], | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-22 09:01:24 -07:00
										 |  |  |         pipeline: Pipeline = self.metadata.create_or_update(data=create_pipeline) | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         execution_ts = datetime_to_ts(datetime.strptime("2021-03-07", "%Y-%m-%d")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         updated = self.metadata.add_pipeline_status( | 
					
						
							| 
									
										
										
										
											2022-08-22 09:01:24 -07:00
										 |  |  |             fqn=pipeline.fullyQualifiedName.__root__, | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |             status=PipelineStatus( | 
					
						
							| 
									
										
										
										
											2022-08-04 07:22:47 -07:00
										 |  |  |                 timestamp=execution_ts, | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |                 executionStatus=StatusType.Successful, | 
					
						
							|  |  |  |                 taskStatus=[ | 
					
						
							|  |  |  |                     TaskStatus(name="task1", executionStatus=StatusType.Successful), | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |                 ], | 
					
						
							|  |  |  |             ), | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # We get a list of status | 
					
						
							| 
									
										
										
										
											2022-08-04 07:22:47 -07:00
										 |  |  |         assert updated.pipelineStatus.timestamp.__root__ == execution_ts | 
					
						
							|  |  |  |         assert len(updated.pipelineStatus.taskStatus) == 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Disabled as throwing an error regarding service key not present | 
					
						
							|  |  |  |         # should be fixed in https://github.com/open-metadata/OpenMetadata/issues/5661 | 
					
						
							|  |  |  |         # # Check that we can update a given status properly | 
					
						
							|  |  |  |         # updated = self.metadata.add_pipeline_status( | 
					
						
							|  |  |  |         #     pipeline=pipeline, | 
					
						
							|  |  |  |         #     status=PipelineStatus( | 
					
						
							|  |  |  |         #         timestamp=execution_ts, | 
					
						
							|  |  |  |         #         executionStatus=StatusType.Successful, | 
					
						
							|  |  |  |         #         taskStatus=[ | 
					
						
							|  |  |  |         #             TaskStatus(name="task1", executionStatus=StatusType.Successful), | 
					
						
							|  |  |  |         #             TaskStatus(name="task2", executionStatus=StatusType.Successful), | 
					
						
							|  |  |  |         #         ], | 
					
						
							|  |  |  |         #     ), | 
					
						
							|  |  |  |         # ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # assert updated.pipelineStatus[0].executionDate.__root__ == execution_ts | 
					
						
							|  |  |  |         # assert len(updated.pipelineStatus[0].taskStatus) == 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # # Cleanup | 
					
						
							|  |  |  |         # self.metadata.delete(entity=Pipeline, entity_id=pipeline.id) | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_add_tasks(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Check the add task logic | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         create_pipeline = CreatePipelineRequest( | 
					
						
							|  |  |  |             name="pipeline-test", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             service=self.service_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |             tasks=[ | 
					
						
							|  |  |  |                 Task(name="task1"), | 
					
						
							|  |  |  |                 Task(name="task2"), | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |             ], | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pipeline = self.metadata.create_or_update(data=create_pipeline) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Add new tasks | 
					
						
							|  |  |  |         updated_pipeline = self.metadata.add_task_to_pipeline( | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |             pipeline, | 
					
						
							|  |  |  |             Task(name="task3"), | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert len(updated_pipeline.tasks) == 3 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Update a task already added | 
					
						
							|  |  |  |         updated_pipeline = self.metadata.add_task_to_pipeline( | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |             pipeline, | 
					
						
							|  |  |  |             Task(name="task3", displayName="TaskDisplay"), | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert len(updated_pipeline.tasks) == 3 | 
					
						
							|  |  |  |         assert next( | 
					
						
							|  |  |  |             iter( | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |                 task | 
					
						
							|  |  |  |                 for task in updated_pipeline.tasks | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |                 if task.displayName == "TaskDisplay" | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Add more than one task at a time | 
					
						
							|  |  |  |         new_tasks = [ | 
					
						
							|  |  |  |             Task(name="task3"), | 
					
						
							|  |  |  |             Task(name="task4"), | 
					
						
							|  |  |  |         ] | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |         updated_pipeline = self.metadata.add_task_to_pipeline(pipeline, *new_tasks) | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         assert len(updated_pipeline.tasks) == 4 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Cleanup | 
					
						
							|  |  |  |         self.metadata.delete(entity=Pipeline, entity_id=pipeline.id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_add_tasks_to_empty_pipeline(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         We can add tasks to a pipeline without tasks | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pipeline = self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         updated_pipeline = self.metadata.add_task_to_pipeline( | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |             pipeline, | 
					
						
							|  |  |  |             Task(name="task", displayName="TaskDisplay"), | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert len(updated_pipeline.tasks) == 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_clean_tasks(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Check that we can remove Pipeline tasks | 
					
						
							|  |  |  |         if they are not part of the list arg | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         create_pipeline = CreatePipelineRequest( | 
					
						
							|  |  |  |             name="pipeline-test", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             service=self.service_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |             tasks=[ | 
					
						
							|  |  |  |                 Task(name="task1"), | 
					
						
							|  |  |  |                 Task(name="task2"), | 
					
						
							|  |  |  |                 Task(name="task3"), | 
					
						
							|  |  |  |                 Task(name="task4"), | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  |             ], | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pipeline = self.metadata.create_or_update(data=create_pipeline) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  |         self.metadata.clean_pipeline_tasks( | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |             pipeline=pipeline, task_ids=["task3", "task4"] | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  |         updated_pipeline = self.metadata.get_by_name( | 
					
						
							|  |  |  |             entity=Pipeline, | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             fqn="test-service-pipeline.pipeline-test", | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  |             fields=["tasks"], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         assert len(updated_pipeline.tasks) == 2 | 
					
						
							|  |  |  |         assert {task.name for task in updated_pipeline.tasks} == {"task3", "task4"} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Cleanup | 
					
						
							|  |  |  |         self.metadata.delete(entity=Pipeline, entity_id=pipeline.id) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-07 10:37:56 +01:00
										 |  |  |     def test_list_versions(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         test list pipeline entity versions | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Find by name | 
					
						
							|  |  |  |         res_name = self.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             entity=Pipeline, fqn=self.entity.fullyQualifiedName | 
					
						
							| 
									
										
										
										
											2022-01-07 10:37:56 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-10 09:07:39 +01:00
										 |  |  |         res = self.metadata.get_list_entity_versions( | 
					
						
							|  |  |  |             entity=Pipeline, entity_id=res_name.id.__root__ | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-01-07 10:37:56 +01:00
										 |  |  |         assert res | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_get_entity_version(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         test get pipeline entity version | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Find by name | 
					
						
							|  |  |  |         res_name = self.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             entity=Pipeline, fqn=self.entity.fullyQualifiedName | 
					
						
							| 
									
										
										
										
											2022-01-07 10:37:56 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  |         res = self.metadata.get_entity_version( | 
					
						
							| 
									
										
										
										
											2022-01-10 09:07:39 +01:00
										 |  |  |             entity=Pipeline, entity_id=res_name.id.__root__, version=0.1 | 
					
						
							| 
									
										
										
										
											2022-01-07 10:37:56 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # check we get the correct version requested and the correct entity ID | 
					
						
							|  |  |  |         assert res.version.__root__ == 0.1 | 
					
						
							|  |  |  |         assert res.id == res_name.id | 
					
						
							| 
									
										
										
										
											2022-01-10 09:07:39 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_get_entity_ref(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         test get EntityReference | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         res = self.metadata.create_or_update(data=self.create) | 
					
						
							|  |  |  |         entity_ref = self.metadata.get_entity_reference( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |             entity=Pipeline, fqn=res.fullyQualifiedName | 
					
						
							| 
									
										
										
										
											2022-01-10 09:07:39 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         assert res.id == entity_ref.id |