| 
									
										
										
										
											2023-02-26 00:52:14 +01:00
										 |  |  | #  Copyright 2021 Collate | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | #  http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Main ingestion entrypoint to run OM workflows | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  | import os | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import yaml | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     PipelineType, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-06-25 17:20:46 +02:00
										 |  |  | from metadata.generated.schema.metadataIngestion.workflow import LogLevels | 
					
						
							|  |  |  | from metadata.utils.logger import set_loggers_level | 
					
						
							| 
									
										
										
										
											2023-10-06 18:29:18 +02:00
										 |  |  | from metadata.workflow.data_quality import TestSuiteWorkflow | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  | from metadata.workflow.metadata import MetadataWorkflow | 
					
						
							| 
									
										
										
										
											2023-09-04 11:02:57 +02:00
										 |  |  | from metadata.workflow.profiler import ProfilerWorkflow | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  | from metadata.workflow.usage import UsageWorkflow | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | WORKFLOW_MAP = { | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |     PipelineType.metadata.value: MetadataWorkflow, | 
					
						
							|  |  |  |     PipelineType.usage.value: UsageWorkflow, | 
					
						
							|  |  |  |     PipelineType.lineage.value: MetadataWorkflow, | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  |     PipelineType.profiler.value: ProfilerWorkflow, | 
					
						
							|  |  |  |     PipelineType.TestSuite.value: TestSuiteWorkflow, | 
					
						
							| 
									
										
										
										
											2023-08-30 15:49:42 +02:00
										 |  |  |     PipelineType.elasticSearchReindex.value: MetadataWorkflow, | 
					
						
							|  |  |  |     PipelineType.dbt.value: MetadataWorkflow, | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main(): | 
					
						
							| 
									
										
										
										
											2023-02-26 00:52:14 +01:00
										 |  |  |     """
 | 
					
						
							|  |  |  |     Ingestion entrypoint. Get the right Workflow class | 
					
						
							|  |  |  |     and execute the ingestion. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This image is expected to be used and run in environments | 
					
						
							|  |  |  |     such as Airflow's KubernetesPodOperator: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     ``` | 
					
						
							|  |  |  |     config = '''
 | 
					
						
							|  |  |  |         source: | 
					
						
							|  |  |  |           type: ... | 
					
						
							|  |  |  |           serviceName: ... | 
					
						
							|  |  |  |           serviceConnection: | 
					
						
							|  |  |  |             ... | 
					
						
							|  |  |  |           sourceConfig: | 
					
						
							|  |  |  |             ... | 
					
						
							|  |  |  |         sink: | 
					
						
							|  |  |  |           ... | 
					
						
							|  |  |  |         workflowConfig: | 
					
						
							|  |  |  |           ... | 
					
						
							|  |  |  |     '''
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     KubernetesPodOperator( | 
					
						
							|  |  |  |         task_id="ingest", | 
					
						
							|  |  |  |         name="ingest", | 
					
						
							|  |  |  |         cmds=["python", "main.py"], | 
					
						
							|  |  |  |         image="openmetadata/ingestion-base:0.13.2", | 
					
						
							|  |  |  |         namespace='default', | 
					
						
							|  |  |  |         env_vars={"config": config, "pipelineType": "metadata"}, | 
					
						
							|  |  |  |         dag=dag, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     ``` | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Note how we are expecting the env variables to be sent, with the `config` being the str | 
					
						
							|  |  |  |     representation of the ingestion YAML. | 
					
						
							| 
									
										
										
										
											2023-03-13 10:50:10 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     We will also set the `pipelineRunId` value if it comes from the environment. | 
					
						
							| 
									
										
										
										
											2023-02-26 00:52:14 +01:00
										 |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  |     # DockerOperator expects an env var called config | 
					
						
							| 
									
										
										
										
											2023-03-13 10:50:10 +01:00
										 |  |  |     config = os.getenv("config") | 
					
						
							|  |  |  |     if not config: | 
					
						
							|  |  |  |         raise RuntimeError( | 
					
						
							|  |  |  |             "Missing environment variable `config`. This is needed to configure the Workflow." | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pipeline_type = os.getenv("pipelineType") | 
					
						
							|  |  |  |     if not pipeline_type: | 
					
						
							|  |  |  |         raise RuntimeError( | 
					
						
							|  |  |  |             "Missing environment variable `pipelineType`. This is needed to load the Workflow class." | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pipeline_run_id = os.getenv("pipelineRunId") | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     workflow_class = WORKFLOW_MAP.get(pipeline_type) | 
					
						
							|  |  |  |     if workflow_class is None: | 
					
						
							|  |  |  |         raise ValueError(f"Missing workflow_class loaded from {pipeline_type}") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Load the config string representation | 
					
						
							|  |  |  |     workflow_config = yaml.safe_load(config) | 
					
						
							| 
									
										
										
										
											2023-03-13 10:50:10 +01:00
										 |  |  |     if pipeline_run_id: | 
					
						
							|  |  |  |         workflow_config["pipelineRunId"] = pipeline_run_id | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-06-25 17:20:46 +02:00
										 |  |  |     logger_level = workflow_config.get("workflowConfig", {}).get("loggerLevel") | 
					
						
							|  |  |  |     set_loggers_level(logger_level or LogLevels.INFO.value) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  |     workflow = workflow_class.create(workflow_config) | 
					
						
							|  |  |  |     workflow.execute() | 
					
						
							|  |  |  |     workflow.raise_from_status() | 
					
						
							| 
									
										
										
										
											2024-07-29 09:20:34 +02:00
										 |  |  |     workflow.print_status() | 
					
						
							| 
									
										
										
										
											2022-10-11 07:50:49 +02:00
										 |  |  |     workflow.stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     main() |