From bdbbca0efe65d7f492fdf7dcbedca5774e0cbd3f Mon Sep 17 00:00:00 2001 From: Nahuel Date: Fri, 26 Aug 2022 07:29:38 +0200 Subject: [PATCH] Fix#6027: Improve logging in OpenMetadata Airflow APIs (#6920) Fix#6027: Improve logging in OpenMetadata Airflow APIs (#6920) --- .../api/error_handlers.py | 21 +++++---- .../api/routes/delete.py | 11 +++-- .../api/routes/deploy.py | 15 ++++-- .../api/routes/disable.py | 9 ++-- .../api/routes/enable.py | 9 ++-- .../api/routes/health.py | 11 ++++- .../api/routes/ip.py | 10 +++- .../api/routes/kill.py | 9 ++-- .../api/routes/last_dag_logs.py | 9 ++-- .../api/routes/status.py | 9 ++-- .../api/routes/test_connection.py | 15 ++++-- .../api/routes/trigger.py | 9 ++-- .../openmetadata_managed_apis/api/utils.py | 10 ++-- .../operations/deploy.py | 32 ++++++------- .../operations/kill_all.py | 2 +- .../operations/test_connection.py | 12 ++++- .../utils/__init__.py | 0 .../openmetadata_managed_apis/utils/logger.py | 46 +++++++++++++++++++ .../workflows/ingestion/common.py | 19 +++++++- .../workflows/workflow_builder.py | 17 ++++--- .../workflows/workflow_factory.py | 16 ++++--- 21 files changed, 210 insertions(+), 81 deletions(-) create mode 100644 openmetadata-airflow-apis/openmetadata_managed_apis/utils/__init__.py create mode 100644 openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/error_handlers.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/error_handlers.py index 7765da1a985..d5ba236270c 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/error_handlers.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/error_handlers.py @@ -12,23 +12,24 @@ Register error handlers """ -import logging - from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import MissingArgException +from openmetadata_managed_apis.utils.logger import api_logger from werkzeug.exceptions import HTTPException +logger = api_logger() + @blueprint.app_errorhandler(Exception) -def handle_any_error(e): - logging.exception("Wild exception") - if isinstance(e, HTTPException): - return ApiResponse.error(e.code, repr(e)) - return ApiResponse.server_error(repr(e)) +def handle_any_error(exc): + logger.exception("Wild exception: {exc}") + if isinstance(exc, HTTPException): + return ApiResponse.error(exc.code, repr(exc)) + return ApiResponse.server_error(repr(exc)) @blueprint.app_errorhandler(MissingArgException) -def handle_missing_arg(e): - logging.exception("Missing Argument Exception") - return ApiResponse.bad_request(repr(e)) +def handle_missing_arg(exc): + logger.exception(f"Missing Argument Exception: {exc}") + return ApiResponse.bad_request(repr(exc)) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/delete.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/delete.py index fdadc7619b0..44529fd4b83 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/delete.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/delete.py @@ -11,7 +11,6 @@ """ Delete the DAG in Airflow's db, as well as the python file """ -import logging import traceback from airflow.api_connexion import security @@ -22,8 +21,11 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_arg_dag_id from openmetadata_managed_apis.operations.delete import delete_dag_id +from openmetadata_managed_apis.utils.logger import routes_logger from werkzeug.utils import secure_filename +logger = routes_logger() + @blueprint.route("/delete", methods=["DELETE"]) @csrf.exempt @@ -45,8 +47,11 @@ def delete_dag() -> Response: return delete_dag_id(secure_dag_id) except Exception as exc: - logging.info(f"Failed to delete dag {dag_id} [secured: {secure_dag_id}]") + logger.debug(traceback.format_exc()) + logger.error( + f"Failed to delete dag [{dag_id}] [secured: {secure_dag_id}]: {exc}" + ) return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to delete {dag_id} [secured: {secure_dag_id}] due to {exc} - {traceback.format_exc()}", + error=f"Failed to delete [{dag_id}] [secured: {secure_dag_id}] due to [{exc}] ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py index fe3c1d5742f..f1aee8e1754 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/deploy.py @@ -20,12 +20,15 @@ from flask import Response, request from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.operations.deploy import DagDeployer +from openmetadata_managed_apis.utils.logger import routes_logger from pydantic import ValidationError from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) +logger = routes_logger() + @blueprint.route("/deploy", methods=["POST"]) @csrf.exempt @@ -48,13 +51,19 @@ def deploy_dag() -> Response: return response except ValidationError as err: + logger.debug(traceback.format_exc()) + logger.error( + f"Request Validation Error parsing payload [{json_request}]. IngestionPipeline expected: {err}" + ) return ApiResponse.error( status=ApiResponse.STATUS_BAD_REQUEST, - error=f"Request Validation Error parsing payload {json_request}. IngestionPipeline expected - {err}", + error=f"Request Validation Error parsing payload. IngestionPipeline expected: {err}", ) - except Exception as err: + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Internal error deploying [{json_request}] due to [{exc}] ") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Internal error deploying {json_request} - {err} - {traceback.format_exc()}", + error=f"Internal error while deploying due to [{exc}] ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/disable.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/disable.py index a42f53c9b48..61ec3c1eba3 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/disable.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/disable.py @@ -11,7 +11,6 @@ """ Disable/Pause a dag """ -import logging import traceback from airflow.api_connexion import security @@ -22,6 +21,9 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_request_dag_id from openmetadata_managed_apis.operations.state import disable_dag +from openmetadata_managed_apis.utils.logger import routes_logger + +logger = routes_logger() @blueprint.route("/disable", methods=["POST"]) @@ -37,8 +39,9 @@ def disable() -> Response: return disable_dag(dag_id) except Exception as exc: - logging.info(f"Failed to get last run logs for '{dag_id}'") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to get last run logs for [{dag_id}]: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}", + error=f"Failed to get last run logs for [{dag_id}] due to {exc} ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/enable.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/enable.py index d723a5e5d21..c3ece0b418b 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/enable.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/enable.py @@ -11,7 +11,6 @@ """ Enable/unpause a DAG """ -import logging import traceback from airflow.api_connexion import security @@ -22,6 +21,9 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_request_dag_id from openmetadata_managed_apis.operations.state import enable_dag +from openmetadata_managed_apis.utils.logger import routes_logger + +logger = routes_logger() @blueprint.route("/enable", methods=["POST"]) @@ -37,8 +39,9 @@ def enable() -> Response: return enable_dag(dag_id) except Exception as exc: - logging.info(f"Failed to get last run logs for '{dag_id}'") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to get last run logs for [{dag_id}]: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}", + error=f"Failed to get last run logs for [{dag_id}] due to {exc} ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py index 0ef32f3a4d7..010a956ff75 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/health.py @@ -13,6 +13,8 @@ Health endpoint. Globally accessible """ import traceback +from openmetadata_managed_apis.utils.logger import routes_logger + try: from importlib.metadata import version except ImportError: @@ -22,6 +24,8 @@ from airflow.www.app import csrf from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse +logger = routes_logger() + @blueprint.route("/health", methods=["GET"]) @csrf.exempt @@ -34,8 +38,11 @@ def health(): return ApiResponse.success( {"status": "healthy", "version": version("openmetadata-ingestion")} ) - except Exception as err: + except Exception as exc: + msg = f"Internal error obtaining REST status due to [{exc}] " + logger.debug(traceback.format_exc()) + logger.error(msg) return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Internal error obtaining REST status - {err} - {traceback.format_exc()}", + error=msg, ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py index 1fce7ba5012..8a09b924004 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/ip.py @@ -14,6 +14,7 @@ IP endpoint import traceback import requests +from openmetadata_managed_apis.utils.logger import routes_logger try: from importlib.metadata import version @@ -26,6 +27,8 @@ from airflow.www.app import csrf from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse +logger = routes_logger() + @blueprint.route("/ip", methods=["GET"]) @csrf.exempt @@ -38,8 +41,11 @@ def get_host_ip(): try: return ApiResponse.success({"ip": requests.get("https://api.ipify.org").text}) - except Exception as err: + except Exception as exc: + msg = f"Internal error obtaining host IP due to [{exc}] " + logger.debug(traceback.format_exc()) + logger.error(msg) return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Internal error obtaining host IP - {err} - {traceback.format_exc()}", + error=msg, ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/kill.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/kill.py index 031f50c14a6..f674a0d1c13 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/kill.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/kill.py @@ -11,7 +11,6 @@ """ Kill all not finished runs """ -import logging import traceback from airflow.api_connexion import security @@ -22,6 +21,9 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_request_dag_id from openmetadata_managed_apis.operations.kill_all import kill_all +from openmetadata_managed_apis.utils.logger import routes_logger + +logger = routes_logger() @blueprint.route("/kill", methods=["POST"]) @@ -38,8 +40,9 @@ def kill() -> Response: return kill_all(dag_id) except Exception as exc: - logging.info(f"Failed to get kill runs for '{dag_id}'") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to get kill runs for [{dag_id}]: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to kill runs for '{dag_id}' due to {exc} - {traceback.format_exc()}", + error=f"Failed to kill runs for [{dag_id}] due to [{exc}] ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/last_dag_logs.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/last_dag_logs.py index 287031b1559..7525c27213d 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/last_dag_logs.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/last_dag_logs.py @@ -11,7 +11,6 @@ """ Return the last DagRun logs for each task """ -import logging import traceback from airflow.api_connexion import security @@ -22,6 +21,9 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_arg_dag_id from openmetadata_managed_apis.operations.last_dag_logs import last_dag_logs +from openmetadata_managed_apis.utils.logger import routes_logger + +logger = routes_logger() @blueprint.route("/last_dag_logs", methods=["GET"]) @@ -38,8 +40,9 @@ def last_logs() -> Response: return last_dag_logs(dag_id) except Exception as exc: - logging.info(f"Failed to get last run logs for '{dag_id}'") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to get last run logs for [{dag_id}]: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to get last run logs for '{dag_id}' due to {exc} - {traceback.format_exc()}", + error=f"Failed to get last run logs for [{dag_id}] due to [{exc}] ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py index dbabcc323e2..4a5c93657ef 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/status.py @@ -11,7 +11,6 @@ """ Return a list of the 10 last status for the ingestion Pipeline """ -import logging import traceback from airflow.api_connexion import security @@ -22,6 +21,9 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_arg_dag_id from openmetadata_managed_apis.operations.status import status +from openmetadata_managed_apis.utils.logger import routes_logger + +logger = routes_logger() @blueprint.route("/status", methods=["GET"]) @@ -37,8 +39,9 @@ def dag_status() -> Response: return status(dag_id) except Exception as exc: - logging.info(f"Failed to get dag {dag_id} status") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to get dag [{dag_id}] status: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Failed to get status for {dag_id} due to {exc} - {traceback.format_exc()}", + error=f"Failed to get status for [{dag_id}] due to [{exc}] ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/test_connection.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/test_connection.py index 39ebe815aca..095de7c959b 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/test_connection.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/test_connection.py @@ -20,10 +20,13 @@ from flask import Response, request from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.operations.test_connection import test_source_connection +from openmetadata_managed_apis.utils.logger import routes_logger from pydantic import ValidationError from metadata.ingestion.api.parser import parse_test_connection_request_gracefully +logger = routes_logger() + @blueprint.route("/test_connection", methods=["POST"]) @csrf.exempt @@ -44,13 +47,19 @@ def test_connection() -> Response: return response except ValidationError as err: + msg = f"Request Validation Error parsing payload. (Workflow)Source expected: {err}" + logger.debug(traceback.format_exc()) + logger.error(msg) return ApiResponse.error( status=ApiResponse.STATUS_BAD_REQUEST, - error=f"Request Validation Error parsing payload. (Workflow)Source expected - {err}", + error=msg, ) - except Exception as err: + except Exception as exc: + msg = f"Internal error testing connection due to [{exc}] " + logger.debug(traceback.format_exc()) + logger.error(msg) return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Internal error testing connection {err} - {traceback.format_exc()}", + error=msg, ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/trigger.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/trigger.py index 31836a4961e..9c6c273a3e8 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/trigger.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/routes/trigger.py @@ -11,7 +11,6 @@ """ Trigger endpoint """ -import logging import traceback from airflow.api_connexion import security @@ -22,6 +21,9 @@ from openmetadata_managed_apis.api.app import blueprint from openmetadata_managed_apis.api.response import ApiResponse from openmetadata_managed_apis.api.utils import get_request_arg, get_request_dag_id from openmetadata_managed_apis.operations.trigger import trigger +from openmetadata_managed_apis.utils.logger import routes_logger + +logger = routes_logger() @blueprint.route("/trigger", methods=["POST"]) @@ -40,8 +42,9 @@ def trigger_dag() -> Response: return response except Exception as exc: - logging.info(f"Failed to trigger dag {dag_id}") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to trigger dag [{dag_id}]: {exc}") return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Workflow {dag_id} has filed to trigger due to {exc} - {traceback.format_exc()}", + error=f"Workflow [{dag_id}] has filed to trigger due to [{exc}] ", ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py index a0c25eabae4..4086c6921da 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/utils.py @@ -10,10 +10,10 @@ # limitations under the License. import importlib -import logging import os import re import sys +import traceback from multiprocessing import Process from typing import Optional @@ -21,6 +21,9 @@ from airflow import settings from airflow.jobs.scheduler_job import SchedulerJob from airflow.models import DagBag from flask import request +from openmetadata_managed_apis.utils.logger import api_logger + +logger = api_logger() class MissingArgException(Exception): @@ -105,8 +108,9 @@ class ScanDagsTask(Process): scheduler_job.run() try: scheduler_job.kill() - except Exception: - logging.info("Rescan Complete: Killed Job") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.info(f"Rescan Complete: Killed Job: {exc}") def scan_dags_job_background(): diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py index b199b54e048..a41613c62cf 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/deploy.py @@ -9,7 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import pkgutil import traceback from pathlib import Path @@ -30,12 +29,15 @@ from openmetadata_managed_apis.api.utils import ( import_path, scan_dags_job_background, ) +from openmetadata_managed_apis.utils.logger import operations_logger from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) from metadata.ingestion.models.encoders import show_secrets_encoder +logger = operations_logger() + class DeployDagException(Exception): """ @@ -51,7 +53,7 @@ class DagDeployer: def __init__(self, ingestion_pipeline: IngestionPipeline): - logging.info( + logger.info( f"Received the following Airflow Configuration: {ingestion_pipeline.airflowConfig}" ) @@ -66,7 +68,7 @@ class DagDeployer: return the path for the Jinja rendering. """ - logging.info(f"Saving file to {dag_config_file_path}") + logger.info(f"Saving file to {dag_config_file_path}") with open(dag_config_file_path, "w") as outfile: outfile.write(self.ingestion_pipeline.json(encoder=show_secrets_encoder)) @@ -96,11 +98,12 @@ class DagDeployer: try: dag_file = import_path(str(dag_py_file)) except Exception as exc: - logging.error(f"Failed to import dag_file {dag_py_file} due to {exc}") + logger.debug(traceback.format_exc()) + logger.error(f"Failed to import dag_file [{dag_py_file}]: {exc}") raise exc if dag_file is None: - raise DeployDagException(f"Failed to import dag_file {dag_py_file}") + raise DeployDagException(f"Failed to import dag_file [{dag_py_file}]") return str(dag_py_file) @@ -117,9 +120,9 @@ class DagDeployer: with settings.Session() as session: try: dag_bag = get_dagbag() - logging.info("dagbag size {}".format(dag_bag.size())) + logger.info("dagbag size {}".format(dag_bag.size())) found_dags = dag_bag.process_file(dag_py_file) - logging.info("processed dags {}".format(found_dags)) + logger.info("processed dags {}".format(found_dags)) dag: DAG = dag_bag.get_dag(self.dag_id, session=session) # Sync to DB dag.sync_to_db(session=session) @@ -128,7 +131,7 @@ class DagDeployer: .filter(DagModel.dag_id == self.dag_id) .first() ) - logging.info("dag_model:" + str(dag_model)) + logger.info("dag_model:" + str(dag_model)) # Scheduler Job to scan dags scan_dags_job_background() @@ -136,20 +139,17 @@ class DagDeployer: {"message": f"Workflow [{self.dag_id}] has been created"} ) except Exception as exc: - logging.info(f"Failed to serialize the dag {exc}") - return ApiResponse.server_error( - { - "message": f"Workflow [{self.dag_id}] failed to refresh due to [{exc}] " - + f"- {traceback.format_exc()}" - } - ) + msg = f"Workflow [{self.dag_id}] failed to refresh due to [{exc}]" + logger.debug(traceback.format_exc()) + logger.error(msg) + return ApiResponse.server_error({f"message": msg}) def deploy(self): """ Run all methods to deploy the DAG """ dag_config_file_path = Path(DAG_GENERATED_CONFIGS) / f"{self.dag_id}.json" - logging.info(f"Config file under {dag_config_file_path}") + logger.info(f"Config file under {dag_config_file_path}") dag_runner_config = self.store_airflow_pipeline_config(dag_config_file_path) dag_py_file = self.store_and_validate_dag_file(dag_runner_config) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/kill_all.py b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/kill_all.py index e1edbe98309..e1608423836 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/kill_all.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/kill_all.py @@ -17,7 +17,7 @@ from airflow import settings from airflow.models import DagModel, DagRun, TaskInstance from airflow.utils.state import DagRunState, TaskInstanceState from flask import Response -from openmetadata_managed_apis.api.response import ApiResponse, ResponseFormat +from openmetadata_managed_apis.api.response import ApiResponse def kill_all(dag_id: str) -> Response: diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/test_connection.py b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/test_connection.py index 739ae3ecec8..96a65b50770 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/operations/test_connection.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/operations/test_connection.py @@ -12,8 +12,11 @@ Module containing the logic to test a connection from a WorkflowSource """ +import traceback + from flask import Response from openmetadata_managed_apis.api.response import ApiResponse +from openmetadata_managed_apis.utils.logger import operations_logger from openmetadata_managed_apis.workflows.ingestion.credentials_builder import ( build_secrets_manager_credentials, ) @@ -28,6 +31,8 @@ from metadata.utils.connections import ( ) from metadata.utils.secrets.secrets_manager_factory import get_secrets_manager +logger = operations_logger() + def test_source_connection( test_service_connection: TestServiceConnectionRequest, @@ -55,10 +60,13 @@ def test_source_connection( try: test_connection(connection) - except SourceConnectionException as err: + except SourceConnectionException as exc: + msg = f"Connection error from [{connection}]: {exc}" + logger.debug(traceback.format_exc()) + logger.error(msg) return ApiResponse.error( status=ApiResponse.STATUS_SERVER_ERROR, - error=f"Connection error from {connection} - {err}", + error=msg, ) return ApiResponse.success({"message": f"Connection with {connection} successful!"}) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/__init__.py b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py new file mode 100644 index 00000000000..429a28d9dc1 --- /dev/null +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py @@ -0,0 +1,46 @@ +import logging +from enum import Enum +from logging.handlers import RotatingFileHandler + +from airflow.configuration import conf + +BASE_LOGGING_FORMAT = ( + "[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s" +) + + +class Loggers(Enum): + API_ROUTES = "AirflowAPIRoutes" + API = "AirflowAPI" + OPERATIONS = "AirflowOperations" + WORKFLOW = "AirflowWorkflow" + + +def build_logger(logger_name: str) -> logging.Logger: + logger = logging.getLogger(logger_name) + log_format = logging.Formatter(BASE_LOGGING_FORMAT) + rotating_log_handler = RotatingFileHandler( + f"{conf.get('logging', 'base_log_folder', fallback='')}/openmetadata_airflow_api.log", + maxBytes=1000000, + backupCount=10, + ) + rotating_log_handler.setFormatter(log_format) + logger.addHandler(rotating_log_handler) + logger.setLevel(logging.DEBUG) + return logger + + +def routes_logger() -> logging.Logger: + return build_logger(Loggers.API_ROUTES.value) + + +def api_logger(): + return build_logger(Loggers.API.value) + + +def operations_logger(): + return build_logger(Loggers.OPERATIONS.value) + + +def workflow_logger(): + return build_logger(Loggers.WORKFLOW.value) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 22873e92674..35bfcfdb037 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -54,6 +54,18 @@ from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.api.workflow import Workflow +class InvalidServiceException(Exception): + """ + Exception to be thrown when couldn't fetch the service from server + """ + + +class ClientInitializationError(Exception): + """ + Exception to be thrown when couldn't initialize the Openmetadata Client + """ + + def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: """ Use the service EntityReference to build the Source. @@ -69,7 +81,10 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: build_secrets_manager_credentials(secrets_manager) ) - metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) + try: + metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) + except Exception as exc: + raise ClientInitializationError(f"Failed to initialize the client: {exc}") service_type = ingestion_pipeline.service.type service: Optional[ @@ -105,7 +120,7 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: ) if not service: - raise ValueError(f"Could not get service from type {service_type}") + raise InvalidServiceException(f"Could not get service from type {service_type}") return WorkflowSource( type=service.serviceType.value.lower(), diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_builder.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_builder.py index 6efb7671bdf..180a049298f 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_builder.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_builder.py @@ -9,18 +9,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow import DAG # these are params only used in the DAG factory, not in the tasks +from openmetadata_managed_apis.utils.logger import workflow_logger from openmetadata_managed_apis.workflows.ingestion.registry import build_registry from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) -logger = logging.getLogger(__name__) +logger = workflow_logger() class WorkflowBuilder: @@ -44,14 +43,14 @@ class WorkflowBuilder: build_fn = build_registry.registry.get(dag_type) if not build_fn: - raise ValueError( - f"Cannot find build function for {dag_type} in {build_registry.registry}" - ) + msg = f"Cannot find build function for {dag_type} in {build_registry.registry}" + logger.error(msg) + raise ValueError(msg) dag = build_fn(self.airflow_pipeline) if not isinstance(dag, DAG): - raise ValueError( - f"Invalid return type from {build_fn.__name__} when building {dag_type}." - ) + msg = f"Invalid return type from {build_fn.__name__} when building {dag_type}." + logger.error(msg) + raise ValueError(msg) return dag diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_factory.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_factory.py index 807e9d812df..fbfead48724 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_factory.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/workflow_factory.py @@ -14,13 +14,14 @@ based on incoming configs. Called in dag_runner.j2 """ -import logging import pathlib +import traceback from typing import Any, Dict from airflow.models import DAG # these are params that cannot be a dag name +from openmetadata_managed_apis.utils.logger import workflow_logger from openmetadata_managed_apis.workflows.config import load_config_file from openmetadata_managed_apis.workflows.workflow_builder import WorkflowBuilder @@ -28,7 +29,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel IngestionPipeline, ) -logger = logging.getLogger(__name__) +logger = workflow_logger() class WorkflowCreationError(Exception): @@ -60,10 +61,11 @@ class WorkflowFactory: workflow_builder: WorkflowBuilder = WorkflowBuilder(self.airflow_pipeline) try: workflow = workflow_builder.build() - except Exception as err: - raise WorkflowCreationError( - f"Failed to generate workflow {self.airflow_pipeline.name.__root__}. verify config is correct" - ) from err + except Exception as exc: + msg = f"Failed to generate workflow [{self.airflow_pipeline.name.__root__}] verify config is correct: {exc}" + logger.debug(traceback.format_exc()) + logger.error(msg) + raise WorkflowCreationError(msg) from exc return workflow @staticmethod @@ -74,7 +76,7 @@ class WorkflowFactory: dag = self.build_dag() self.dag = dag self.register_dag(dag, globals_namespace) - logger.info("registered the dag") + logger.info(f"Registered the dag: {dag.dag_id}") def get_dag(self) -> DAG: return self.dag