From 567a62f7dbdb1ef2be1d767b7b92a63e92cbfdd7 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 6 Jan 2023 16:08:37 +0100 Subject: [PATCH] Fix session handling for refresh job (#9636) Co-authored-by: NiharDoshi99 <51595473+NiharDoshi99@users.noreply.github.com> --- .../openmetadata_managed_apis/operations/deploy.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py index efde5e630a3..61410f7e220 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py @@ -15,6 +15,7 @@ from pathlib import Path from typing import Dict from airflow import DAG, settings +from airflow.jobs.scheduler_job import SchedulerJob from airflow.models import DagModel from jinja2 import Template from openmetadata_managed_apis.api.config import ( @@ -142,18 +143,18 @@ class DagDeployer: .first() ) logger.info("dag_model:" + str(dag_model)) - # Scheduler Job to scan dags - scan_dags_job_background() - - return ApiResponse.success( - {"message": f"Workflow [{self.dag_id}] has been created"} - ) except Exception as exc: msg = f"Workflow [{self.dag_id}] failed to refresh due to [{exc}]" logger.debug(traceback.format_exc()) logger.error(msg) return ApiResponse.server_error({f"message": msg}) + scan_dags_job_background() + + return ApiResponse.success( + {"message": f"Workflow [{self.dag_id}] has been created"} + ) + def deploy(self): """ Run all methods to deploy the DAG