Prepare exit handler & Add missing arg to test connection (#10519)

* Prepare exit handler

* clean envs
This commit is contained in:
Pere Miquel Brull 2023-03-13 10:50:10 +01:00 committed by GitHub
parent 113fcc2956
commit db292eaa0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 144 additions and 5 deletions

View File

@ -0,0 +1,118 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Entrypoint to send exit handler information when a pipeline fails
"""
import logging
import os
from datetime import datetime
import yaml
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
PipelineStatus,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
SUCCESS_STATES = {"Succeeded"}
def main():
"""
Exit Handler entrypoint
```
config = '''
source:
type: ...
serviceName: ...
serviceConnection:
...
sourceConfig:
...
sink:
...
workflowConfig:
...
'''
The goal of this script is to be executed as a failure callback/exit handler
when a Workflow processing fails. There are situations where the failure
cannot be directly controlled in the Workflow class.
We don't want to initialize the full workflow as it might be failing
on the `__init__` call as well. We'll manually prepare the status sending
logic.
In this callback we just care about:
- instantiating the ometa client
- getting the IngestionPipeline FQN
- if exists, update with `Failed` status
"""
config = os.getenv("config")
if not config:
raise RuntimeError(
"Missing environment variable `config`. This is needed to configure the Workflow."
)
pipeline_run_id = os.getenv("pipelineRunId")
raw_pipeline_status = os.getenv("pipelineStatus")
raw_workflow_config = yaml.safe_load(config)
raw_workflow_config["pipelineRunId"] = pipeline_run_id
workflow_config = OpenMetadataWorkflowConfig.parse_obj(raw_workflow_config)
metadata = OpenMetadata(
config=workflow_config.workflowConfig.openMetadataServerConfig
)
if workflow_config.ingestionPipelineFQN and pipeline_run_id and raw_pipeline_status:
logging.info(
f"Sending status to Ingestion Pipeline {workflow_config.ingestionPipelineFQN}"
)
pipeline_status = metadata.get_pipeline_status(
workflow_config.ingestionPipelineFQN,
str(workflow_config.pipelineRunId.__root__),
)
# Maybe the workflow was not even initialized
if not pipeline_status:
pipeline_status = PipelineStatus(
runId=str(workflow_config.pipelineRunId.__root__),
startDate=datetime.now().timestamp() * 1000,
timestamp=datetime.now().timestamp() * 1000,
)
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = (
PipelineState.failed
if raw_pipeline_status not in SUCCESS_STATES
else PipelineState.success
)
metadata.create_or_update_pipeline_status(
workflow_config.ingestionPipelineFQN, pipeline_status
)
else:
logging.info(
"Missing ingestionPipelineFQN, pipelineRunId or pipelineStatus. We won't update the status."
)
if __name__ == "__main__":
main()

View File

@ -71,11 +71,24 @@ def main():
Note how we are expecting the env variables to be sent, with the `config` being the str
representation of the ingestion YAML.
We will also set the `pipelineRunId` value if it comes from the environment.
"""
# DockerOperator expects an env var called config
config = os.environ["config"]
pipeline_type = os.environ["pipelineType"]
config = os.getenv("config")
if not config:
raise RuntimeError(
"Missing environment variable `config`. This is needed to configure the Workflow."
)
pipeline_type = os.getenv("pipelineType")
if not pipeline_type:
raise RuntimeError(
"Missing environment variable `pipelineType`. This is needed to load the Workflow class."
)
pipeline_run_id = os.getenv("pipelineRunId")
workflow_class = WORKFLOW_MAP.get(pipeline_type)
if workflow_class is None:
@ -83,6 +96,9 @@ def main():
# Load the config string representation
workflow_config = yaml.safe_load(config)
if pipeline_run_id:
workflow_config["pipelineRunId"] = pipeline_run_id
workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()

View File

@ -39,8 +39,13 @@ def main():
```
"""
# DockerOperator expects an env var called config
test_connection_dict = yaml.safe_load(os.environ["config"])
config = os.getenv("config")
if not config:
raise RuntimeError(
"Missing environment variable `config` with the TestServiceConnectionRequest dict."
)
test_connection_dict = yaml.safe_load(config)
test_service_connection = TestServiceConnectionRequest.parse_obj(
test_connection_dict
)
@ -54,7 +59,7 @@ def main():
test_connection_fn = get_test_connection_fn(
test_service_connection.connection.config
)
test_connection_fn(connection)
test_connection_fn(connection, test_service_connection.connection.config)
if __name__ == "__main__":