From db292eaa0b6c69a74099d32fcab6a889ed5dfb10 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 13 Mar 2023 10:50:10 +0100 Subject: [PATCH] Prepare exit handler & Add missing arg to test connection (#10519) * Prepare exit handler * clean envs --- ingestion/operators/docker/exit_handler.py | 118 ++++++++++++++++++ ingestion/operators/docker/main.py | 20 ++- ingestion/operators/docker/test_connection.py | 11 +- 3 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 ingestion/operators/docker/exit_handler.py diff --git a/ingestion/operators/docker/exit_handler.py b/ingestion/operators/docker/exit_handler.py new file mode 100644 index 00000000000..c2353217a0c --- /dev/null +++ b/ingestion/operators/docker/exit_handler.py @@ -0,0 +1,118 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Entrypoint to send exit handler information when a pipeline fails +""" +import logging +import os +from datetime import datetime + +import yaml + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, + PipelineStatus, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + +SUCCESS_STATES = {"Succeeded"} + + +def main(): + """ + Exit Handler entrypoint + + ``` + config = ''' + source: + type: ... + serviceName: ... + serviceConnection: + ... + sourceConfig: + ... + sink: + ... + workflowConfig: + ... + ''' + + The goal of this script is to be executed as a failure callback/exit handler + when a Workflow processing fails. There are situations where the failure + cannot be directly controlled in the Workflow class. + + We don't want to initialize the full workflow as it might be failing + on the `__init__` call as well. We'll manually prepare the status sending + logic. + + In this callback we just care about: + - instantiating the ometa client + - getting the IngestionPipeline FQN + - if exists, update with `Failed` status + """ + + config = os.getenv("config") + if not config: + raise RuntimeError( + "Missing environment variable `config`. This is needed to configure the Workflow." + ) + + pipeline_run_id = os.getenv("pipelineRunId") + raw_pipeline_status = os.getenv("pipelineStatus") + + raw_workflow_config = yaml.safe_load(config) + raw_workflow_config["pipelineRunId"] = pipeline_run_id + + workflow_config = OpenMetadataWorkflowConfig.parse_obj(raw_workflow_config) + metadata = OpenMetadata( + config=workflow_config.workflowConfig.openMetadataServerConfig + ) + + if workflow_config.ingestionPipelineFQN and pipeline_run_id and raw_pipeline_status: + logging.info( + f"Sending status to Ingestion Pipeline {workflow_config.ingestionPipelineFQN}" + ) + + pipeline_status = metadata.get_pipeline_status( + workflow_config.ingestionPipelineFQN, + str(workflow_config.pipelineRunId.__root__), + ) + + # Maybe the workflow was not even initialized + if not pipeline_status: + pipeline_status = PipelineStatus( + runId=str(workflow_config.pipelineRunId.__root__), + startDate=datetime.now().timestamp() * 1000, + timestamp=datetime.now().timestamp() * 1000, + ) + + pipeline_status.endDate = datetime.now().timestamp() * 1000 + pipeline_status.pipelineState = ( + PipelineState.failed + if raw_pipeline_status not in SUCCESS_STATES + else PipelineState.success + ) + + metadata.create_or_update_pipeline_status( + workflow_config.ingestionPipelineFQN, pipeline_status + ) + + else: + logging.info( + "Missing ingestionPipelineFQN, pipelineRunId or pipelineStatus. We won't update the status." + ) + + +if __name__ == "__main__": + main() diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index 724e3f9425c..1132785d3f0 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -71,11 +71,24 @@ def main(): Note how we are expecting the env variables to be sent, with the `config` being the str representation of the ingestion YAML. + + We will also set the `pipelineRunId` value if it comes from the environment. """ # DockerOperator expects an env var called config - config = os.environ["config"] - pipeline_type = os.environ["pipelineType"] + config = os.getenv("config") + if not config: + raise RuntimeError( + "Missing environment variable `config`. This is needed to configure the Workflow." + ) + + pipeline_type = os.getenv("pipelineType") + if not pipeline_type: + raise RuntimeError( + "Missing environment variable `pipelineType`. This is needed to load the Workflow class." + ) + + pipeline_run_id = os.getenv("pipelineRunId") workflow_class = WORKFLOW_MAP.get(pipeline_type) if workflow_class is None: @@ -83,6 +96,9 @@ def main(): # Load the config string representation workflow_config = yaml.safe_load(config) + if pipeline_run_id: + workflow_config["pipelineRunId"] = pipeline_run_id + workflow = workflow_class.create(workflow_config) workflow.execute() workflow.raise_from_status() diff --git a/ingestion/operators/docker/test_connection.py b/ingestion/operators/docker/test_connection.py index 1d06aac7768..b8ba9955415 100644 --- a/ingestion/operators/docker/test_connection.py +++ b/ingestion/operators/docker/test_connection.py @@ -39,8 +39,13 @@ def main(): ``` """ - # DockerOperator expects an env var called config - test_connection_dict = yaml.safe_load(os.environ["config"]) + config = os.getenv("config") + if not config: + raise RuntimeError( + "Missing environment variable `config` with the TestServiceConnectionRequest dict." + ) + + test_connection_dict = yaml.safe_load(config) test_service_connection = TestServiceConnectionRequest.parse_obj( test_connection_dict ) @@ -54,7 +59,7 @@ def main(): test_connection_fn = get_test_connection_fn( test_service_connection.connection.config ) - test_connection_fn(connection) + test_connection_fn(connection, test_service_connection.connection.config) if __name__ == "__main__":