mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 12:36:23 +00:00 
			
		
		
		
	
		
			
	
	
		
			73 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			73 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								#  Copyright 2022 Collate
							 | 
						||
| 
								 | 
							
								#  Licensed under the Apache License, Version 2.0 (the "License");
							 | 
						||
| 
								 | 
							
								#  you may not use this file except in compliance with the License.
							 | 
						||
| 
								 | 
							
								#  You may obtain a copy of the License at
							 | 
						||
| 
								 | 
							
								#  http://www.apache.org/licenses/LICENSE-2.0
							 | 
						||
| 
								 | 
							
								#  Unless required by applicable law or agreed to in writing, software
							 | 
						||
| 
								 | 
							
								#  distributed under the License is distributed on an "AS IS" BASIS,
							 | 
						||
| 
								 | 
							
								#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
							 | 
						||
| 
								 | 
							
								#  See the License for the specific language governing permissions and
							 | 
						||
| 
								 | 
							
								#  limitations under the License.
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								Metadata DAG function builder
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from airflow import DAG
							 | 
						||
| 
								 | 
							
								from openmetadata_managed_apis.workflows.ingestion.common import (
							 | 
						||
| 
								 | 
							
								    build_dag,
							 | 
						||
| 
								 | 
							
								    build_source,
							 | 
						||
| 
								 | 
							
								    build_workflow_config_property,
							 | 
						||
| 
								 | 
							
								    metadata_ingestion_workflow,
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								    from airflow.operators.python import PythonOperator
							 | 
						||
| 
								 | 
							
								except ModuleNotFoundError:
							 | 
						||
| 
								 | 
							
								    from airflow.operators.python_operator import PythonOperator
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
							 | 
						||
| 
								 | 
							
								    IngestionPipeline,
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								from metadata.generated.schema.metadataIngestion.workflow import (
							 | 
						||
| 
								 | 
							
								    OpenMetadataWorkflowConfig,
							 | 
						||
| 
								 | 
							
								    Sink,
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def build_dbt_workflow_config(
							 | 
						||
| 
								 | 
							
								    ingestion_pipeline: IngestionPipeline,
							 | 
						||
| 
								 | 
							
								) -> OpenMetadataWorkflowConfig:
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    Given an airflow_pipeline, prepare the workflow config JSON
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    source = build_source(ingestion_pipeline)
							 | 
						||
| 
								 | 
							
								    source.type = f"dbt"  # Mark the source as dbt
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    workflow_config = OpenMetadataWorkflowConfig(
							 | 
						||
| 
								 | 
							
								        source=source,
							 | 
						||
| 
								 | 
							
								        sink=Sink(
							 | 
						||
| 
								 | 
							
								            type="metadata-rest",
							 | 
						||
| 
								 | 
							
								            config={},
							 | 
						||
| 
								 | 
							
								        ),
							 | 
						||
| 
								 | 
							
								        workflowConfig=build_workflow_config_property(ingestion_pipeline),
							 | 
						||
| 
								 | 
							
								        ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return workflow_config
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def build_dbt_dag(ingestion_pipeline: IngestionPipeline) -> DAG:
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    Build a simple metadata workflow DAG
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    workflow_config = build_dbt_workflow_config(ingestion_pipeline)
							 | 
						||
| 
								 | 
							
								    dag = build_dag(
							 | 
						||
| 
								 | 
							
								        task_name="dbt_task",
							 | 
						||
| 
								 | 
							
								        ingestion_pipeline=ingestion_pipeline,
							 | 
						||
| 
								 | 
							
								        workflow_config=workflow_config,
							 | 
						||
| 
								 | 
							
								        workflow_fn=metadata_ingestion_workflow,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return dag
							 |