| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  Copyright 2025 Collate | 
					
						
							|  |  |  | #  Licensed under the Collate Community License, Version 1.0 (the "License"); | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from airflow import DAG | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # these are params only used in the DAG factory, not in the tasks | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  | from openmetadata_managed_apis.utils.logger import workflow_logger | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from openmetadata_managed_apis.workflows.ingestion.registry import build_registry | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  | logger = workflow_logger() | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class WorkflowBuilder: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Generates tasks and a DAG from a config. | 
					
						
							|  |  |  |     :param workflow_config:  configuration for the DAG | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, ingestion_pipeline: IngestionPipeline) -> None: | 
					
						
							|  |  |  |         self.airflow_pipeline = ingestion_pipeline | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |         self.dag_name: str = self.airflow_pipeline.name.root | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def build(self) -> DAG: | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         Generates a DAG from the DAG parameters. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :returns: dict with dag_id and DAG object | 
					
						
							|  |  |  |         :type: Dict[str, Union[str, DAG]] | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         dag_type = self.airflow_pipeline.pipelineType.value | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         build_fn = build_registry.registry.get(dag_type) | 
					
						
							|  |  |  |         if not build_fn: | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  |             msg = f"Cannot find build function for {dag_type} in {build_registry.registry}" | 
					
						
							|  |  |  |             logger.error(msg) | 
					
						
							|  |  |  |             raise ValueError(msg) | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         dag = build_fn(self.airflow_pipeline) | 
					
						
							|  |  |  |         if not isinstance(dag, DAG): | 
					
						
							| 
									
										
										
										
											2022-08-26 07:29:38 +02:00
										 |  |  |             msg = f"Invalid return type from {build_fn.__name__} when building {dag_type}." | 
					
						
							|  |  |  |             logger.error(msg) | 
					
						
							|  |  |  |             raise ValueError(msg) | 
					
						
							| 
									
										
										
										
											2022-04-11 18:38:26 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |         return dag |