mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 18:48:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			41 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			41 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| 
 | |
| import yaml
 | |
| 
 | |
| from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
 | |
|     PipelineType,
 | |
| )
 | |
| from metadata.ingestion.api.workflow import Workflow
 | |
| from metadata.orm_profiler.api.workflow import ProfilerWorkflow
 | |
| from metadata.test_suite.api.workflow import TestSuiteWorkflow
 | |
| 
 | |
| WORKFLOW_MAP = {
 | |
|     PipelineType.metadata.value: Workflow,
 | |
|     PipelineType.usage.value: Workflow,
 | |
|     PipelineType.lineage.value: Workflow,
 | |
|     PipelineType.profiler.value: ProfilerWorkflow,
 | |
|     PipelineType.TestSuite.value: TestSuiteWorkflow,
 | |
| }
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     # DockerOperator expects an env var called config
 | |
|     config = os.environ["config"]
 | |
|     pipeline_type = os.environ["pipelineType"]
 | |
| 
 | |
|     workflow_class = WORKFLOW_MAP.get(pipeline_type)
 | |
|     if workflow_class is None:
 | |
|         raise ValueError(f"Missing workflow_class loaded from {pipeline_type}")
 | |
| 
 | |
|     # Load the config string representation
 | |
|     workflow_config = yaml.safe_load(config)
 | |
|     workflow = workflow_class.create(workflow_config)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
