| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Test airflow lineage backend | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from datetime import datetime, timedelta | 
					
						
							|  |  |  | from unittest import TestCase | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # The DAG object; we'll need this to instantiate a DAG | 
					
						
							|  |  |  | from airflow import DAG | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  | from airflow.models import TaskInstance | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | from airflow.operators.bash import BashOperator | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  | from airflow.operators.dummy import DummyOperator | 
					
						
							|  |  |  | from airflow.utils.task_group import TaskGroup | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | from airflow_provider_openmetadata.lineage.openmetadata import ( | 
					
						
							|  |  |  |     OpenMetadataLineageBackend, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-03-01 12:19:36 +01:00
										 |  |  | from airflow_provider_openmetadata.lineage.utils import get_xlets | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  | from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest | 
					
						
							|  |  |  | from metadata.generated.schema.api.data.createTable import CreateTableRequest | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | from metadata.generated.schema.api.services.createDatabaseService import ( | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     CreateDatabaseServiceRequest, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.data.pipeline import Pipeline | 
					
						
							| 
									
										
										
										
											2022-02-14 16:53:42 +01:00
										 |  |  | from metadata.generated.schema.entity.data.table import Column, DataType | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  | from metadata.generated.schema.entity.services.databaseService import ( | 
					
						
							| 
									
										
										
										
											2022-01-21 22:06:14 -08:00
										 |  |  |     DatabaseConnection, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     DatabaseServiceType, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.type.entityReference import EntityReference | 
					
						
							|  |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							|  |  |  | from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class AirflowLineageTest(TestCase): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Run this test installing the necessary airflow version | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") | 
					
						
							|  |  |  |     metadata = OpenMetadata(server_config) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert metadata.health_check() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |     service = CreateDatabaseServiceRequest( | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         name="test-service-table-lineage", | 
					
						
							|  |  |  |         serviceType=DatabaseServiceType.MySQL, | 
					
						
							| 
									
										
										
										
											2022-01-21 22:06:14 -08:00
										 |  |  |         databaseConnection=DatabaseConnection(hostPort="localhost"), | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     ) | 
					
						
							|  |  |  |     service_type = "databaseService" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |     backend = OpenMetadataLineageBackend() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |     @classmethod | 
					
						
							|  |  |  |     def setUpClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Prepare ingredients: Table Entity + DAG | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.service_entity = cls.metadata.create_or_update(data=cls.service) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |         cls.create_db = CreateDatabaseRequest( | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             name="test-db", | 
					
						
							|  |  |  |             service=EntityReference(id=cls.service_entity.id, type="databaseService"), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-13 17:51:25 +01:00
										 |  |  |         cls.db_reference = EntityReference( | 
					
						
							|  |  |  |             id=cls.create_db_entity.id, name="test-db", type="database" | 
					
						
							|  |  |  |         ) | 
					
						
							| 
									
										
										
										
											2022-02-04 12:39:08 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-01 01:29:56 +01:00
										 |  |  |         cls.create = CreateTableRequest( | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             name="lineage-test", | 
					
						
							| 
									
										
										
										
											2022-02-04 12:39:08 -08:00
										 |  |  |             database=cls.db_reference, | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             columns=[Column(name="id", dataType=DataType.BIGINT)], | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.table = cls.metadata.create_or_update(data=cls.create) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with DAG( | 
					
						
							|  |  |  |             "lineage", | 
					
						
							|  |  |  |             description="A lineage test DAG", | 
					
						
							|  |  |  |             schedule_interval=timedelta(days=1), | 
					
						
							|  |  |  |             start_date=datetime(2021, 1, 1), | 
					
						
							|  |  |  |         ) as dag: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             t1 = BashOperator(  # Using BashOperator as a random example | 
					
						
							|  |  |  |                 task_id="task1", | 
					
						
							|  |  |  |                 bash_command="date", | 
					
						
							|  |  |  |                 outlets={"tables": ["test-service-table-lineage.test-db.lineage-test"]}, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             t2 = BashOperator(  # Using BashOperator as a random example | 
					
						
							|  |  |  |                 task_id="task2", | 
					
						
							|  |  |  |                 bash_command="sleep 5", | 
					
						
							|  |  |  |                 inlets={"tables": ["test-service-table-lineage.test-db.lineage-test"]}, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             t3 = BashOperator( | 
					
						
							|  |  |  |                 task_id="task3", | 
					
						
							|  |  |  |                 bash_command="echo", | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             t1 >> t2 >> t3 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             cls.dag = dag | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_xlets(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Verify that we can extract inlets and outlets | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsNone(get_xlets(self.dag.get_task("task1"), "_inlets")) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             ["test-service-table-lineage.test-db.lineage-test"], | 
					
						
							|  |  |  |             get_xlets(self.dag.get_task("task1"), "_outlets"), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             ["test-service-table-lineage.test-db.lineage-test"], | 
					
						
							|  |  |  |             get_xlets(self.dag.get_task("task2"), "_inlets"), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.assertIsNone(get_xlets(self.dag.get_task("task2"), "_outlets")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_inlets")) | 
					
						
							|  |  |  |         self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_outlets")) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_lineage(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Test end to end | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |         self.backend.send_lineage( | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             operator=self.dag.get_task("task1"), | 
					
						
							|  |  |  |             context={ | 
					
						
							|  |  |  |                 "dag": self.dag, | 
					
						
							|  |  |  |                 "task": self.dag.get_task("task1"), | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  |                 "task_instance": TaskInstance( | 
					
						
							|  |  |  |                     task=self.dag.get_task("task1"), | 
					
						
							|  |  |  |                     execution_date=datetime.strptime( | 
					
						
							|  |  |  |                         "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" | 
					
						
							|  |  |  |                     ), | 
					
						
							|  |  |  |                     run_id="scheduled__2022-03-15T08:13:45.967068+00:00", | 
					
						
							|  |  |  |                     state="running", | 
					
						
							|  |  |  |                 ), | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |             }, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertIsNotNone( | 
					
						
							| 
									
										
										
										
											2022-02-14 16:53:42 +01:00
										 |  |  |             self.metadata.get_by_name(entity=Pipeline, fqdn="local_airflow_3.lineage") | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         lineage = self.metadata.get_lineage_by_name( | 
					
						
							| 
									
										
										
										
											2022-02-14 16:53:42 +01:00
										 |  |  |             entity=Pipeline, fqdn="local_airflow_3.lineage" | 
					
						
							| 
									
										
										
										
											2022-01-14 22:00:33 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         nodes = {node["id"] for node in lineage["nodes"]} | 
					
						
							|  |  |  |         self.assertIn(str(self.table.id.__root__), nodes) | 
					
						
							| 
									
										
										
										
											2022-03-21 18:29:49 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def test_lineage_task_group(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Test end to end for task groups | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with DAG( | 
					
						
							|  |  |  |             "task_group_lineage", | 
					
						
							|  |  |  |             description="A lineage test DAG", | 
					
						
							|  |  |  |             schedule_interval=timedelta(days=1), | 
					
						
							|  |  |  |             start_date=datetime(2021, 1, 1), | 
					
						
							|  |  |  |         ) as dag: | 
					
						
							|  |  |  |             t0 = DummyOperator(task_id="start") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Start Task Group definition | 
					
						
							|  |  |  |             with TaskGroup(group_id="group1") as tg1: | 
					
						
							|  |  |  |                 t1 = DummyOperator(task_id="task1") | 
					
						
							|  |  |  |                 t2 = DummyOperator(task_id="task2") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 t1 >> t2 | 
					
						
							|  |  |  |             # End Task Group definition | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             t3 = DummyOperator(task_id="end") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Set Task Group's (tg1) dependencies | 
					
						
							|  |  |  |             t0 >> tg1 >> t3 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self.backend.send_lineage( | 
					
						
							|  |  |  |                 operator=dag.get_task("group1.task1"), | 
					
						
							|  |  |  |                 context={ | 
					
						
							|  |  |  |                     "dag": dag, | 
					
						
							|  |  |  |                     "task": dag.get_task("group1.task1"), | 
					
						
							|  |  |  |                     "task_instance": TaskInstance( | 
					
						
							|  |  |  |                         task=dag.get_task("group1.task1"), | 
					
						
							|  |  |  |                         execution_date=datetime.strptime( | 
					
						
							|  |  |  |                             "2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S" | 
					
						
							|  |  |  |                         ), | 
					
						
							|  |  |  |                         run_id="scheduled__2022-03-15T08:13:45.967068+00:00", | 
					
						
							|  |  |  |                         state="running", | 
					
						
							|  |  |  |                     ), | 
					
						
							|  |  |  |                 }, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         pipeline = self.metadata.get_by_name( | 
					
						
							|  |  |  |             entity=Pipeline, fqdn="local_airflow_3.task_group_lineage", fields=["tasks"] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         self.assertIsNotNone(pipeline) | 
					
						
							|  |  |  |         self.assertIn("group1_DOT_task1", {task.name for task in pipeline.tasks}) |