mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 02:29:03 +00:00 
			
		
		
		
	
		
			
	
	
		
			73 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			73 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | #  Copyright 2022 Collate | ||
|  | #  Licensed under the Apache License, Version 2.0 (the "License"); | ||
|  | #  you may not use this file except in compliance with the License. | ||
|  | #  You may obtain a copy of the License at | ||
|  | #  http://www.apache.org/licenses/LICENSE-2.0 | ||
|  | #  Unless required by applicable law or agreed to in writing, software | ||
|  | #  distributed under the License is distributed on an "AS IS" BASIS, | ||
|  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|  | #  See the License for the specific language governing permissions and | ||
|  | #  limitations under the License. | ||
|  | """
 | ||
|  | Metadata DAG function builder | ||
|  | """
 | ||
|  | 
 | ||
|  | from airflow import DAG | ||
|  | from openmetadata_managed_apis.workflows.ingestion.common import ( | ||
|  |     build_dag, | ||
|  |     build_source, | ||
|  |     build_workflow_config_property, | ||
|  |     metadata_ingestion_workflow, | ||
|  | ) | ||
|  | 
 | ||
|  | try: | ||
|  |     from airflow.operators.python import PythonOperator | ||
|  | except ModuleNotFoundError: | ||
|  |     from airflow.operators.python_operator import PythonOperator | ||
|  | 
 | ||
|  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | ||
|  |     IngestionPipeline, | ||
|  | ) | ||
|  | from metadata.generated.schema.metadataIngestion.workflow import ( | ||
|  |     OpenMetadataWorkflowConfig, | ||
|  |     Sink, | ||
|  | ) | ||
|  | 
 | ||
|  | 
 | ||
|  | def build_dbt_workflow_config( | ||
|  |     ingestion_pipeline: IngestionPipeline, | ||
|  | ) -> OpenMetadataWorkflowConfig: | ||
|  |     """
 | ||
|  |     Given an airflow_pipeline, prepare the workflow config JSON | ||
|  |     """
 | ||
|  | 
 | ||
|  |     source = build_source(ingestion_pipeline) | ||
|  |     source.type = f"dbt"  # Mark the source as dbt | ||
|  | 
 | ||
|  |     workflow_config = OpenMetadataWorkflowConfig( | ||
|  |         source=source, | ||
|  |         sink=Sink( | ||
|  |             type="metadata-rest", | ||
|  |             config={}, | ||
|  |         ), | ||
|  |         workflowConfig=build_workflow_config_property(ingestion_pipeline), | ||
|  |         ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, | ||
|  |     ) | ||
|  | 
 | ||
|  |     return workflow_config | ||
|  | 
 | ||
|  | 
 | ||
|  | def build_dbt_dag(ingestion_pipeline: IngestionPipeline) -> DAG: | ||
|  |     """
 | ||
|  |     Build a simple metadata workflow DAG | ||
|  |     """
 | ||
|  |     workflow_config = build_dbt_workflow_config(ingestion_pipeline) | ||
|  |     dag = build_dag( | ||
|  |         task_name="dbt_task", | ||
|  |         ingestion_pipeline=ingestion_pipeline, | ||
|  |         workflow_config=workflow_config, | ||
|  |         workflow_fn=metadata_ingestion_workflow, | ||
|  |     ) | ||
|  | 
 | ||
|  |     return dag |