| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  Copyright 2025 Collate | 
					
						
							|  |  |  | #  Licensed under the Collate Community License, Version 1.0 (the "License"); | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  | Test airflow lineage operator and hook. | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-02-07 06:49:46 +01:00
										 |  |  | This test is coupled with the example DAG `lineage_tutorial_operator`. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | With the `docker compose up` setup, you can debug the progress | 
					
						
							|  |  |  | by setting breakpoints in this file. | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | """
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  | import time | 
					
						
							|  |  |  | from typing import Optional | 
					
						
							|  |  |  | from unittest import TestCase | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  | import requests | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  | from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  | from metadata.generated.schema.api.data.createDatabaseSchema import ( | 
					
						
							|  |  |  |     CreateDatabaseSchemaRequest, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  | from metadata.generated.schema.api.data.createTable import CreateTableRequest | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | from metadata.generated.schema.api.services.createDatabaseService import ( | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     CreateDatabaseServiceRequest, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  | from metadata.generated.schema.entity.data.pipeline import Pipeline, StatusType | 
					
						
							| 
									
										
										
										
											2023-02-07 06:49:46 +01:00
										 |  |  | from metadata.generated.schema.entity.data.table import Column, DataType, Table | 
					
						
							| 
									
										
										
										
											2023-06-16 13:18:12 +05:30
										 |  |  | from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( | 
					
						
							|  |  |  |     BasicAuth, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  | from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( | 
					
						
							|  |  |  |     MysqlConnection, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-04-12 23:40:21 -07:00
										 |  |  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | 
					
						
							|  |  |  |     OpenMetadataConnection, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | from metadata.generated.schema.entity.services.databaseService import ( | 
					
						
							| 
									
										
										
										
											2022-01-21 22:06:14 -08:00
										 |  |  |     DatabaseConnection, | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |     DatabaseService, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     DatabaseServiceType, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  | from metadata.generated.schema.entity.services.pipelineService import PipelineService | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | 
					
						
							|  |  |  |     OpenMetadataJWTClientConfig, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-22 15:43:50 +01:00
										 |  |  | # These variables are just here to validate elements in the local deployment | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  | OM_HOST_PORT = "http://localhost:8585/api" | 
					
						
							|  |  |  | OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" | 
					
						
							|  |  |  | AIRFLOW_HOST_API_ROOT = "http://localhost:8080/api/v1/" | 
					
						
							|  |  |  | DEFAULT_OM_AIRFLOW_CONNECTION = "openmetadata_conn_id" | 
					
						
							|  |  |  | DEFAULT_AIRFLOW_HEADERS = { | 
					
						
							|  |  |  |     "Content-Type": "application/json", | 
					
						
							|  |  |  |     "Authorization": "Basic YWRtaW46YWRtaW4=", | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | OM_LINEAGE_DAG_NAME = "lineage_tutorial_operator" | 
					
						
							|  |  |  | PIPELINE_SERVICE_NAME = "airflow_lineage_op_service" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_task_status_type_by_name(pipeline: Pipeline, name: str) -> Optional[StatusType]: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Given a pipeline, get its status by name | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     return next( | 
					
						
							|  |  |  |         ( | 
					
						
							|  |  |  |             status.executionStatus | 
					
						
							|  |  |  |             for status in pipeline.pipelineStatus.taskStatus | 
					
						
							|  |  |  |             if status.name == name | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         None, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | class AirflowLineageTest(TestCase): | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |     This test will trigger an Airflow DAG and validate that the | 
					
						
							|  |  |  |     OpenMetadata Lineage Operator can properly handle the | 
					
						
							|  |  |  |     metadata ingestion and processes inlets and outlets. | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  |     server_config = OpenMetadataConnection( | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         hostPort=OM_HOST_PORT, | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  |         authProvider="openmetadata", | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         securityConfig=OpenMetadataJWTClientConfig(jwtToken=OM_JWT), | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     metadata = OpenMetadata(server_config) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert metadata.health_check() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     service = CreateDatabaseServiceRequest( | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         name="test-service-table-lineage", | 
					
						
							| 
									
										
										
										
											2022-04-14 11:22:39 +02:00
										 |  |  |         serviceType=DatabaseServiceType.Mysql, | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |         connection=DatabaseConnection( | 
					
						
							|  |  |  |             config=MysqlConnection( | 
					
						
							|  |  |  |                 username="username", | 
					
						
							| 
									
										
										
										
											2023-06-16 13:18:12 +05:30
										 |  |  |                 authType=BasicAuth(password="password"), | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |                 hostPort="http://localhost:1234", | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         ), | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     ) | 
					
						
							|  |  |  |     service_type = "databaseService" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def setUpClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Prepare ingredients: Table Entity + DAG | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |         service_entity = cls.metadata.create_or_update(data=cls.service) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |         create_db = CreateDatabaseRequest( | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             name="test-db", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             service=service_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         create_db_entity = cls.metadata.create_or_update(data=create_db) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         create_schema = CreateDatabaseSchemaRequest( | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             name="test-schema", | 
					
						
							|  |  |  |             database=create_db_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |         create_schema_entity = cls.metadata.create_or_update(data=create_schema) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         create_inlet = CreateTableRequest( | 
					
						
							|  |  |  |             name="lineage-test-inlet", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             databaseSchema=create_schema_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             columns=[Column(name="id", dataType=DataType.BIGINT)], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-06-12 07:01:19 +02:00
										 |  |  |         create_inlet_2 = CreateTableRequest( | 
					
						
							|  |  |  |             name="lineage-test-inlet2", | 
					
						
							|  |  |  |             databaseSchema=create_schema_entity.fullyQualifiedName, | 
					
						
							|  |  |  |             columns=[Column(name="id", dataType=DataType.BIGINT)], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         create_outlet = CreateTableRequest( | 
					
						
							|  |  |  |             name="lineage-test-outlet", | 
					
						
							| 
									
										
										
										
											2023-02-13 00:08:55 -08:00
										 |  |  |             databaseSchema=create_schema_entity.fullyQualifiedName, | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |             columns=[Column(name="id", dataType=DataType.BIGINT)], | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-06-12 07:01:19 +02:00
										 |  |  |         cls.metadata.create_or_update(data=create_inlet) | 
					
						
							|  |  |  |         cls.metadata.create_or_update(data=create_inlet_2) | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         cls.table_outlet = cls.metadata.create_or_update(data=create_outlet) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |     @classmethod | 
					
						
							|  |  |  |     def tearDownClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Clean up | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         service_id = str( | 
					
						
							|  |  |  |             cls.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-05-26 21:00:18 +02:00
										 |  |  |                 entity=DatabaseService, fqn="test-service-table-lineage" | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             ).id.root | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.metadata.delete( | 
					
						
							|  |  |  |             entity=DatabaseService, | 
					
						
							|  |  |  |             entity_id=service_id, | 
					
						
							|  |  |  |             recursive=True, | 
					
						
							|  |  |  |             hard_delete=True, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         # Service ID created from the Airflow Lineage Operator in the | 
					
						
							|  |  |  |         # example DAG | 
					
						
							|  |  |  |         pipeline_service_id = str( | 
					
						
							|  |  |  |             cls.metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=PipelineService, fqn=PIPELINE_SERVICE_NAME | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             ).id.root | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         cls.metadata.delete( | 
					
						
							|  |  |  |             entity=PipelineService, | 
					
						
							|  |  |  |             entity_id=pipeline_service_id, | 
					
						
							|  |  |  |             recursive=True, | 
					
						
							|  |  |  |             hard_delete=True, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |     def test_dag_runs(self) -> None: | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         Trigger the Airflow DAG and wait until it runs. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Note that the DAG definition is in examples/airflow_lineage_operator.py | 
					
						
							|  |  |  |         and it is expected to fail. This will allow us to validate | 
					
						
							|  |  |  |         the task status afterward. | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         # 1. Validate that the OpenMetadata connection exists | 
					
						
							|  |  |  |         res = requests.get( | 
					
						
							|  |  |  |             AIRFLOW_HOST_API_ROOT + f"connections/{DEFAULT_OM_AIRFLOW_CONNECTION}", | 
					
						
							|  |  |  |             headers=DEFAULT_AIRFLOW_HEADERS, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         if res.status_code != 200: | 
					
						
							|  |  |  |             raise RuntimeError( | 
					
						
							|  |  |  |                 f"Could not fetch {DEFAULT_OM_AIRFLOW_CONNECTION} connection" | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         # 2. Enable the DAG | 
					
						
							|  |  |  |         res = requests.patch( | 
					
						
							|  |  |  |             AIRFLOW_HOST_API_ROOT + f"dags/{OM_LINEAGE_DAG_NAME}", | 
					
						
							|  |  |  |             json={"is_paused": False}, | 
					
						
							|  |  |  |             headers=DEFAULT_AIRFLOW_HEADERS, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         if res.status_code != 200: | 
					
						
							|  |  |  |             raise RuntimeError(f"Could not enable {OM_LINEAGE_DAG_NAME} DAG") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # 3. Trigger the DAG | 
					
						
							|  |  |  |         res = requests.post( | 
					
						
							|  |  |  |             AIRFLOW_HOST_API_ROOT + f"dags/{OM_LINEAGE_DAG_NAME}/dagRuns", | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             json={}, | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |             headers=DEFAULT_AIRFLOW_HEADERS, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         if res.status_code != 200: | 
					
						
							|  |  |  |             raise RuntimeError(f"Could not trigger {OM_LINEAGE_DAG_NAME} DAG") | 
					
						
							|  |  |  |         dag_run_id = res.json()["dag_run_id"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # 4. Wait until the DAG is flagged as `successful` | 
					
						
							|  |  |  |         state = "queued" | 
					
						
							|  |  |  |         tries = 0 | 
					
						
							|  |  |  |         while state != "success" and tries <= 5: | 
					
						
							|  |  |  |             tries += 1 | 
					
						
							|  |  |  |             time.sleep(5) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             res = requests.get( | 
					
						
							|  |  |  |                 AIRFLOW_HOST_API_ROOT | 
					
						
							|  |  |  |                 + f"dags/{OM_LINEAGE_DAG_NAME}/dagRuns/{dag_run_id}", | 
					
						
							|  |  |  |                 headers=DEFAULT_AIRFLOW_HEADERS, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             state = res.json().get("state") | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         if state != "success": | 
					
						
							|  |  |  |             raise RuntimeError(f"DAG {OM_LINEAGE_DAG_NAME} has not finished on time.") | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |     def test_pipeline_created(self) -> None: | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         Validate that the pipeline has been created | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         pipeline_service: PipelineService = self.metadata.get_by_name( | 
					
						
							|  |  |  |             entity=PipelineService, fqn=PIPELINE_SERVICE_NAME | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.assertIsNotNone(pipeline_service) | 
					
						
							| 
									
										
										
										
											2022-04-07 14:52:50 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-06-03 11:43:40 +02:00
										 |  |  |         pipeline: Pipeline = self.metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |             entity=Pipeline, | 
					
						
							|  |  |  |             fqn=f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}", | 
					
						
							|  |  |  |             fields=["tasks", "pipelineStatus"], | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  |         self.assertIsNotNone(pipeline) | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         expected_task_names = set((task.name for task in pipeline.tasks)) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             expected_task_names, {"print_date", "sleep", "templated", "lineage_op"} | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |         self.assertEqual(pipeline.description.root, "A simple tutorial DAG") | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |         # Validate status | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             get_task_status_type_by_name(pipeline, "print_date"), StatusType.Successful | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             get_task_status_type_by_name(pipeline, "sleep"), StatusType.Successful | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             get_task_status_type_by_name(pipeline, "templated"), StatusType.Successful | 
					
						
							| 
									
										
										
										
											2022-05-08 06:10:35 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-17 04:52:12 +01:00
										 |  |  |     def test_pipeline_lineage(self) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate that the pipeline has proper lineage | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2023-06-12 07:01:19 +02:00
										 |  |  |         root_name = "test-service-table-lineage.test-db.test-schema" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Check that both inlets have the same outlet | 
					
						
							|  |  |  |         for inlet_table in [ | 
					
						
							|  |  |  |             f"{root_name}.lineage-test-inlet", | 
					
						
							|  |  |  |             f"{root_name}.lineage-test-inlet2", | 
					
						
							|  |  |  |         ]: | 
					
						
							|  |  |  |             lineage = self.metadata.get_lineage_by_name( | 
					
						
							|  |  |  |                 entity=Table, | 
					
						
							|  |  |  |                 fqn=inlet_table, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             node_names = set((node["name"] for node in lineage.get("nodes") or [])) | 
					
						
							|  |  |  |             self.assertEqual(node_names, {"lineage-test-outlet"}) | 
					
						
							|  |  |  |             self.assertEqual(len(lineage.get("downstreamEdges")), 1) | 
					
						
							|  |  |  |             self.assertEqual( | 
					
						
							|  |  |  |                 lineage["downstreamEdges"][0]["toEntity"], | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |                 str(self.table_outlet.id.root), | 
					
						
							| 
									
										
										
										
											2023-06-12 07:01:19 +02:00
										 |  |  |             ) | 
					
						
							|  |  |  |             self.assertEqual( | 
					
						
							|  |  |  |                 lineage["downstreamEdges"][0]["lineageDetails"]["pipeline"][ | 
					
						
							|  |  |  |                     "fullyQualifiedName" | 
					
						
							|  |  |  |                 ], | 
					
						
							|  |  |  |                 f"{PIPELINE_SERVICE_NAME}.{OM_LINEAGE_DAG_NAME}", | 
					
						
							|  |  |  |             ) |