mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-25 07:42:40 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			119 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			119 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #  Copyright 2021 Collate
 | |
| #  Licensed under the Apache License, Version 2.0 (the "License");
 | |
| #  you may not use this file except in compliance with the License.
 | |
| #  You may obtain a copy of the License at
 | |
| #  http://www.apache.org/licenses/LICENSE-2.0
 | |
| #  Unless required by applicable law or agreed to in writing, software
 | |
| #  distributed under the License is distributed on an "AS IS" BASIS,
 | |
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| #  See the License for the specific language governing permissions and
 | |
| #  limitations under the License.
 | |
| """
 | |
| Entrypoint to send exit handler information when a pipeline fails
 | |
| """
 | |
| import logging
 | |
| import os
 | |
| from datetime import datetime
 | |
| 
 | |
| import yaml
 | |
| 
 | |
| from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
 | |
|     PipelineState,
 | |
|     PipelineStatus,
 | |
| )
 | |
| from metadata.generated.schema.metadataIngestion.workflow import (
 | |
|     OpenMetadataWorkflowConfig,
 | |
| )
 | |
| from metadata.ingestion.ometa.ometa_api import OpenMetadata
 | |
| 
 | |
| SUCCESS_STATES = {"Succeeded"}
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     """
 | |
|     Exit Handler entrypoint
 | |
| 
 | |
|     ```
 | |
|     config = '''
 | |
|         source:
 | |
|           type: ...
 | |
|           serviceName: ...
 | |
|           serviceConnection:
 | |
|             ...
 | |
|           sourceConfig:
 | |
|             ...
 | |
|         sink:
 | |
|           ...
 | |
|         workflowConfig:
 | |
|           ...
 | |
|     '''
 | |
| 
 | |
|     The goal of this script is to be executed as a failure callback/exit handler
 | |
|     when a Workflow processing fails. There are situations where the failure
 | |
|     cannot be directly controlled in the Workflow class.
 | |
| 
 | |
|     We don't want to initialize the full workflow as it might be failing
 | |
|     on the `__init__` call as well. We'll manually prepare the status sending
 | |
|     logic.
 | |
| 
 | |
|     In this callback we just care about:
 | |
|     - instantiating the ometa client
 | |
|     - getting the IngestionPipeline FQN
 | |
|     - if exists, update with `Failed` status
 | |
|     """
 | |
| 
 | |
|     config = os.getenv("config")
 | |
|     if not config:
 | |
|         raise RuntimeError(
 | |
|             "Missing environment variable `config`. This is needed to configure the Workflow."
 | |
|         )
 | |
| 
 | |
|     pipeline_run_id = os.getenv("pipelineRunId")
 | |
|     raw_pipeline_status = os.getenv("pipelineStatus")
 | |
| 
 | |
|     raw_workflow_config = yaml.safe_load(config)
 | |
|     raw_workflow_config["pipelineRunId"] = pipeline_run_id
 | |
| 
 | |
|     workflow_config = OpenMetadataWorkflowConfig.parse_obj(raw_workflow_config)
 | |
|     metadata = OpenMetadata(
 | |
|         config=workflow_config.workflowConfig.openMetadataServerConfig
 | |
|     )
 | |
| 
 | |
|     if workflow_config.ingestionPipelineFQN and pipeline_run_id and raw_pipeline_status:
 | |
|         logging.info(
 | |
|             f"Sending status to Ingestion Pipeline {workflow_config.ingestionPipelineFQN}"
 | |
|         )
 | |
| 
 | |
|         pipeline_status = metadata.get_pipeline_status(
 | |
|             workflow_config.ingestionPipelineFQN,
 | |
|             str(workflow_config.pipelineRunId.__root__),
 | |
|         )
 | |
| 
 | |
|         # Maybe the workflow was not even initialized
 | |
|         if not pipeline_status:
 | |
|             pipeline_status = PipelineStatus(
 | |
|                 runId=str(workflow_config.pipelineRunId.__root__),
 | |
|                 startDate=datetime.now().timestamp() * 1000,
 | |
|                 timestamp=datetime.now().timestamp() * 1000,
 | |
|             )
 | |
| 
 | |
|         pipeline_status.endDate = datetime.now().timestamp() * 1000
 | |
|         pipeline_status.pipelineState = (
 | |
|             PipelineState.failed
 | |
|             if raw_pipeline_status not in SUCCESS_STATES
 | |
|             else PipelineState.success
 | |
|         )
 | |
| 
 | |
|         metadata.create_or_update_pipeline_status(
 | |
|             workflow_config.ingestionPipelineFQN, pipeline_status
 | |
|         )
 | |
| 
 | |
|     else:
 | |
|         logging.info(
 | |
|             "Missing ingestionPipelineFQN, pipelineRunId or pipelineStatus. We won't update the status."
 | |
|         )
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
