| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  Copyright 2025 Collate | 
					
						
							|  |  |  | #  Licensed under the Collate Community License, Version 1.0 (the "License"); | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Test Airflow related operations | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | import datetime | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import shutil | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | import uuid | 
					
						
							|  |  |  | from pathlib import Path | 
					
						
							|  |  |  | from unittest import TestCase | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # We need to patch the environment before importing Airflow | 
					
						
							|  |  |  | # At module load it already inits the configurations. | 
					
						
							|  |  |  | from metadata.generated.schema.api.services.createDatabaseService import ( | 
					
						
							|  |  |  |     CreateDatabaseServiceRequest, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-06-16 13:18:12 +05:30
										 |  |  | from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( | 
					
						
							|  |  |  |     BasicAuth, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( | 
					
						
							|  |  |  |     MysqlConnection, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | 
					
						
							|  |  |  |     OpenMetadataConnection, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import ( | 
					
						
							|  |  |  |     DatabaseConnection, | 
					
						
							|  |  |  |     DatabaseService, | 
					
						
							|  |  |  |     DatabaseServiceType, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     AirflowConfig, | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							|  |  |  |     PipelineType, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( | 
					
						
							|  |  |  |     DatabaseServiceMetadataPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.workflow import SourceConfig | 
					
						
							| 
									
										
										
										
											2023-05-03 11:37:35 +02:00
										 |  |  | from metadata.generated.schema.type.basic import Markdown | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | from metadata.generated.schema.type.entityReference import EntityReference | 
					
						
							|  |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | os.environ["AIRFLOW_HOME"] = "/tmp/airflow" | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | os.environ[ | 
					
						
							|  |  |  |     "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" | 
					
						
							|  |  |  | ] = "mysql+pymysql://airflow_user:airflow_pass@localhost/airflow_db" | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | os.environ["AIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS"] = "/tmp/airflow" | 
					
						
							|  |  |  | os.environ["AIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_RUNNER_TEMPLATE"] = str( | 
					
						
							|  |  |  |     ( | 
					
						
							|  |  |  |         Path(__file__).parent.parent.parent.parent | 
					
						
							|  |  |  |         / "src/plugins/dag_templates/dag_runner.j2" | 
					
						
							|  |  |  |     ).absolute() | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from airflow import DAG | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from airflow.models import DagBag, DagModel | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | from airflow.operators.bash import BashOperator | 
					
						
							| 
									
										
										
										
											2022-12-14 21:14:51 +05:30
										 |  |  | from airflow.utils import timezone | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | from airflow.utils.state import DagRunState | 
					
						
							|  |  |  | from airflow.utils.types import DagRunType | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.operations.delete import delete_dag_id | 
					
						
							|  |  |  | from openmetadata_managed_apis.operations.deploy import DagDeployer | 
					
						
							|  |  |  | from openmetadata_managed_apis.operations.kill_all import kill_all | 
					
						
							|  |  |  | from openmetadata_managed_apis.operations.state import disable_dag, enable_dag | 
					
						
							|  |  |  | from openmetadata_managed_apis.operations.status import status | 
					
						
							|  |  |  | from openmetadata_managed_apis.operations.trigger import trigger | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | 
					
						
							|  |  |  |     OpenMetadataJWTClientConfig, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | class TestAirflowOps(TestCase): | 
					
						
							|  |  |  |     dagbag: DagBag | 
					
						
							|  |  |  |     dag: DAG | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-26 16:19:47 +05:30
										 |  |  |     conn = OpenMetadataConnection( | 
					
						
							|  |  |  |         hostPort="http://localhost:8585/api", | 
					
						
							|  |  |  |         authProvider="openmetadata", | 
					
						
							|  |  |  |         securityConfig=OpenMetadataJWTClientConfig( | 
					
						
							|  |  |  |             jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |     metadata = OpenMetadata(conn) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def setUpClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Prepare ingredients | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with DAG( | 
					
						
							|  |  |  |             "dag_status", | 
					
						
							|  |  |  |             description="A lineage test DAG", | 
					
						
							|  |  |  |             schedule_interval=datetime.timedelta(days=1), | 
					
						
							|  |  |  |             start_date=datetime.datetime(2021, 1, 1), | 
					
						
							|  |  |  |         ) as cls.dag: | 
					
						
							|  |  |  |             BashOperator(  # Using BashOperator as a random example | 
					
						
							|  |  |  |                 task_id="task1", | 
					
						
							|  |  |  |                 bash_command="date", | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.dag.sync_to_db() | 
					
						
							|  |  |  |         cls.dagbag = DagBag(include_examples=False) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def tearDownClass(cls) -> None: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Clean up | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         service_id = str( | 
					
						
							|  |  |  |             cls.metadata.get_by_name( | 
					
						
							|  |  |  |                 entity=DatabaseService, fqn="test-service-ops" | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             ).id.root | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         cls.metadata.delete( | 
					
						
							|  |  |  |             entity=DatabaseService, | 
					
						
							|  |  |  |             entity_id=service_id, | 
					
						
							|  |  |  |             recursive=True, | 
					
						
							|  |  |  |             hard_delete=True, | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         shutil.rmtree("/tmp/airflow") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_dag_status(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Validate: | 
					
						
							|  |  |  |             - DAG with no status | 
					
						
							|  |  |  |             - DAG with a single status | 
					
						
							|  |  |  |             - DAG with multiple status | 
					
						
							|  |  |  |             - Missing DAG | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = status(dag_id="random") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 404) | 
					
						
							|  |  |  |         self.assertEqual(res.json, {"error": "DAG random not found."}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = status(dag_id="dag_status") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual(res.json, []) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.dag.create_dagrun( | 
					
						
							|  |  |  |             run_type=DagRunType.MANUAL, | 
					
						
							|  |  |  |             state=DagRunState.RUNNING, | 
					
						
							|  |  |  |             execution_date=timezone.utcnow(), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = status(dag_id="dag_status") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |         self.assertEqual(res.json[0].get("pipelineState"), "running") | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self.dag.create_dagrun( | 
					
						
							|  |  |  |             run_type=DagRunType.MANUAL, | 
					
						
							|  |  |  |             state=DagRunState.SUCCESS, | 
					
						
							|  |  |  |             execution_date=timezone.utcnow(), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = status(dag_id="dag_status") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual(len(res.json), 2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = kill_all(dag_id="dag_status") | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             res.json, {"message": f"Workflow [dag_status] has been killed"} | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = status(dag_id="dag_status") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |         res_status = {elem.get("pipelineState") for elem in res.json} | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |         self.assertEqual(res_status, {"failed", "success"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_dag_state(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         DAG can be enabled and disabled | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = enable_dag(dag_id="random") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 404) | 
					
						
							|  |  |  |         self.assertEqual(res.json, {"error": "DAG random not found."}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = enable_dag(dag_id="dag_status") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual(res.json, {"message": "DAG dag_status has been enabled"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         res = disable_dag(dag_id="dag_status") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual(res.json, {"message": "DAG dag_status has been disabled"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_dag_deploy_and_delete(self): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         DAGs can be deployed | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         service = self.metadata.create_or_update( | 
					
						
							|  |  |  |             CreateDatabaseServiceRequest( | 
					
						
							|  |  |  |                 name="test-service-ops", | 
					
						
							|  |  |  |                 serviceType=DatabaseServiceType.Mysql, | 
					
						
							|  |  |  |                 connection=DatabaseConnection( | 
					
						
							|  |  |  |                     config=MysqlConnection( | 
					
						
							|  |  |  |                         username="username", | 
					
						
							| 
									
										
										
										
											2023-06-16 13:18:12 +05:30
										 |  |  |                         authType=BasicAuth( | 
					
						
							|  |  |  |                             password="password", | 
					
						
							|  |  |  |                         ), | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |                         hostPort="http://localhost:1234", | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                 ), | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ingestion_pipeline = IngestionPipeline( | 
					
						
							|  |  |  |             id=uuid.uuid4(), | 
					
						
							|  |  |  |             pipelineType=PipelineType.metadata, | 
					
						
							|  |  |  |             name="my_new_dag", | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             description=Markdown("A test DAG"), | 
					
						
							| 
									
										
										
										
											2022-11-03 14:37:26 +05:30
										 |  |  |             fullyQualifiedName="test-service-ops.my_new_dag", | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |             sourceConfig=SourceConfig(config=DatabaseServiceMetadataPipeline()), | 
					
						
							|  |  |  |             openMetadataServerConnection=self.conn, | 
					
						
							|  |  |  |             airflowConfig=AirflowConfig(), | 
					
						
							|  |  |  |             service=EntityReference( | 
					
						
							|  |  |  |                 id=service.id, type="databaseService", name="test-service-ops" | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Create the DAG | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  |         deployer = DagDeployer(ingestion_pipeline) | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |         res = deployer.deploy() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual( | 
					
						
							|  |  |  |             res.json, {"message": "Workflow [my_new_dag] has been created"} | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         dag_file = Path("/tmp/airflow/dags/my_new_dag.py") | 
					
						
							|  |  |  |         self.assertTrue(dag_file.is_file()) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  |         # Trigger it, waiting for it to be parsed by the scheduler | 
					
						
							|  |  |  |         dag_id = "my_new_dag" | 
					
						
							|  |  |  |         tries = 5 | 
					
						
							|  |  |  |         dag_model = None | 
					
						
							|  |  |  |         while not dag_model and tries >= 0: | 
					
						
							|  |  |  |             dag_model = DagModel.get_current(dag_id) | 
					
						
							|  |  |  |             time.sleep(5) | 
					
						
							|  |  |  |             tries -= 1 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-21 19:04:02 +02:00
										 |  |  |         res = trigger(dag_id="my_new_dag", run_id=None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertIn("Workflow [my_new_dag] has been triggered", res.json["message"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Delete it | 
					
						
							|  |  |  |         res = delete_dag_id("my_new_dag") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 200) | 
					
						
							|  |  |  |         self.assertEqual(res.json, {"message": "DAG [my_new_dag] has been deleted"}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.assertFalse(dag_file.is_file()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Cannot find it anymore | 
					
						
							|  |  |  |         res = status(dag_id="my_new_dag") | 
					
						
							|  |  |  |         self.assertEqual(res.status_code, 404) |