mirror of
				https://github.com/infiniflow/ragflow.git
				synced 2025-10-31 01:40:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			314 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			314 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #
 | |
| #  Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
 | |
| #
 | |
| #  Licensed under the Apache License, Version 2.0 (the "License");
 | |
| #  you may not use this file except in compliance with the License.
 | |
| #  You may obtain a copy of the License at
 | |
| #
 | |
| #      http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #  Unless required by applicable law or agreed to in writing, software
 | |
| #  distributed under the License is distributed on an "AS IS" BASIS,
 | |
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| #  See the License for the specific language governing permissions and
 | |
| #  limitations under the License.
 | |
| #
 | |
| import os
 | |
| import typing
 | |
| import traceback
 | |
| import logging
 | |
| import inspect
 | |
| from logging.handlers import TimedRotatingFileHandler
 | |
| from threading import RLock
 | |
| 
 | |
| from api.utils import file_utils
 | |
| 
 | |
| 
 | |
| class LoggerFactory(object):
 | |
|     TYPE = "FILE"
 | |
|     LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s"
 | |
|     logging.basicConfig(format=LOG_FORMAT)
 | |
|     LEVEL = logging.DEBUG
 | |
|     logger_dict = {}
 | |
|     global_handler_dict = {}
 | |
| 
 | |
|     LOG_DIR = None
 | |
|     PARENT_LOG_DIR = None
 | |
|     log_share = True
 | |
| 
 | |
|     append_to_parent_log = None
 | |
| 
 | |
|     lock = RLock()
 | |
|     # CRITICAL = 50
 | |
|     # FATAL = CRITICAL
 | |
|     # ERROR = 40
 | |
|     # WARNING = 30
 | |
|     # WARN = WARNING
 | |
|     # INFO = 20
 | |
|     # DEBUG = 10
 | |
|     # NOTSET = 0
 | |
|     levels = (10, 20, 30, 40)
 | |
|     schedule_logger_dict = {}
 | |
| 
 | |
|     @staticmethod
 | |
|     def set_directory(directory=None, parent_log_dir=None,
 | |
|                       append_to_parent_log=None, force=False):
 | |
|         if parent_log_dir:
 | |
|             LoggerFactory.PARENT_LOG_DIR = parent_log_dir
 | |
|         if append_to_parent_log:
 | |
|             LoggerFactory.append_to_parent_log = append_to_parent_log
 | |
|         with LoggerFactory.lock:
 | |
|             if not directory:
 | |
|                 directory = file_utils.get_project_base_directory("logs")
 | |
|             if not LoggerFactory.LOG_DIR or force:
 | |
|                 LoggerFactory.LOG_DIR = directory
 | |
|             if LoggerFactory.log_share:
 | |
|                 oldmask = os.umask(000)
 | |
|                 os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True)
 | |
|                 os.umask(oldmask)
 | |
|             else:
 | |
|                 os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True)
 | |
|             for loggerName, ghandler in LoggerFactory.global_handler_dict.items():
 | |
|                 for className, (logger,
 | |
|                                 handler) in LoggerFactory.logger_dict.items():
 | |
|                     logger.removeHandler(ghandler)
 | |
|                 ghandler.close()
 | |
|             LoggerFactory.global_handler_dict = {}
 | |
|             for className, (logger,
 | |
|                             handler) in LoggerFactory.logger_dict.items():
 | |
|                 logger.removeHandler(handler)
 | |
|                 _handler = None
 | |
|                 if handler:
 | |
|                     handler.close()
 | |
|                 if className != "default":
 | |
|                     _handler = LoggerFactory.get_handler(className)
 | |
|                     logger.addHandler(_handler)
 | |
|                 LoggerFactory.assemble_global_handler(logger)
 | |
|                 LoggerFactory.logger_dict[className] = logger, _handler
 | |
| 
 | |
|     @staticmethod
 | |
|     def new_logger(name):
 | |
|         logger = logging.getLogger(name)
 | |
|         logger.propagate = False
 | |
|         logger.setLevel(LoggerFactory.LEVEL)
 | |
|         return logger
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_logger(class_name=None):
 | |
|         with LoggerFactory.lock:
 | |
|             if class_name in LoggerFactory.logger_dict.keys():
 | |
|                 logger, handler = LoggerFactory.logger_dict[class_name]
 | |
|                 if not logger:
 | |
|                     logger, handler = LoggerFactory.init_logger(class_name)
 | |
|             else:
 | |
|                 logger, handler = LoggerFactory.init_logger(class_name)
 | |
|             return logger
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_global_handler(logger_name, level=None, log_dir=None):
 | |
|         if not LoggerFactory.LOG_DIR:
 | |
|             return logging.StreamHandler()
 | |
|         if log_dir:
 | |
|             logger_name_key = logger_name + "_" + log_dir
 | |
|         else:
 | |
|             logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR
 | |
|         # if loggerName not in LoggerFactory.globalHandlerDict:
 | |
|         if logger_name_key not in LoggerFactory.global_handler_dict:
 | |
|             with LoggerFactory.lock:
 | |
|                 if logger_name_key not in LoggerFactory.global_handler_dict:
 | |
|                     handler = LoggerFactory.get_handler(
 | |
|                         logger_name, level, log_dir)
 | |
|                     LoggerFactory.global_handler_dict[logger_name_key] = handler
 | |
|         return LoggerFactory.global_handler_dict[logger_name_key]
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_handler(class_name, level=None, log_dir=None,
 | |
|                     log_type=None, job_id=None):
 | |
|         if not log_type:
 | |
|             if not LoggerFactory.LOG_DIR or not class_name:
 | |
|                 return logging.StreamHandler()
 | |
|                 # return Diy_StreamHandler()
 | |
| 
 | |
|             if not log_dir:
 | |
|                 log_file = os.path.join(
 | |
|                     LoggerFactory.LOG_DIR,
 | |
|                     "{}.log".format(class_name))
 | |
|             else:
 | |
|                 log_file = os.path.join(log_dir, "{}.log".format(class_name))
 | |
|         else:
 | |
|             log_file = os.path.join(log_dir, "rag_flow_{}.log".format(
 | |
|                 log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type))
 | |
| 
 | |
|         os.makedirs(os.path.dirname(log_file), exist_ok=True)
 | |
|         if LoggerFactory.log_share:
 | |
|             handler = ROpenHandler(log_file,
 | |
|                                    when='D',
 | |
|                                    interval=1,
 | |
|                                    backupCount=14,
 | |
|                                    delay=True)
 | |
|         else:
 | |
|             handler = TimedRotatingFileHandler(log_file,
 | |
|                                                when='D',
 | |
|                                                interval=1,
 | |
|                                                backupCount=14,
 | |
|                                                delay=True)
 | |
|         if level:
 | |
|             handler.level = level
 | |
| 
 | |
|         return handler
 | |
| 
 | |
|     @staticmethod
 | |
|     def init_logger(class_name):
 | |
|         with LoggerFactory.lock:
 | |
|             logger = LoggerFactory.new_logger(class_name)
 | |
|             handler = None
 | |
|             if class_name:
 | |
|                 handler = LoggerFactory.get_handler(class_name)
 | |
|                 logger.addHandler(handler)
 | |
|                 LoggerFactory.logger_dict[class_name] = logger, handler
 | |
| 
 | |
|             else:
 | |
|                 LoggerFactory.logger_dict["default"] = logger, handler
 | |
| 
 | |
|             LoggerFactory.assemble_global_handler(logger)
 | |
|             return logger, handler
 | |
| 
 | |
|     @staticmethod
 | |
|     def assemble_global_handler(logger):
 | |
|         if LoggerFactory.LOG_DIR:
 | |
|             for level in LoggerFactory.levels:
 | |
|                 if level >= LoggerFactory.LEVEL:
 | |
|                     level_logger_name = logging._levelToName[level]
 | |
|                     logger.addHandler(
 | |
|                         LoggerFactory.get_global_handler(
 | |
|                             level_logger_name, level))
 | |
|         if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR:
 | |
|             for level in LoggerFactory.levels:
 | |
|                 if level >= LoggerFactory.LEVEL:
 | |
|                     level_logger_name = logging._levelToName[level]
 | |
|                     logger.addHandler(
 | |
|                         LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR))
 | |
| 
 | |
| 
 | |
| def setDirectory(directory=None):
 | |
|     LoggerFactory.set_directory(directory)
 | |
| 
 | |
| 
 | |
| def setLevel(level):
 | |
|     LoggerFactory.LEVEL = level
 | |
| 
 | |
| 
 | |
| def getLogger(className=None, useLevelFile=False):
 | |
|     if className is None:
 | |
|         frame = inspect.stack()[1]
 | |
|         module = inspect.getmodule(frame[0])
 | |
|         className = 'stat'
 | |
|     return LoggerFactory.get_logger(className)
 | |
| 
 | |
| 
 | |
| def exception_to_trace_string(ex):
 | |
|     return "".join(traceback.TracebackException.from_exception(ex).format())
 | |
| 
 | |
| 
 | |
| class ROpenHandler(TimedRotatingFileHandler):
 | |
|     def _open(self):
 | |
|         prevumask = os.umask(000)
 | |
|         rtv = TimedRotatingFileHandler._open(self)
 | |
|         os.umask(prevumask)
 | |
|         return rtv
 | |
| 
 | |
| 
 | |
| def sql_logger(job_id='', log_type='sql'):
 | |
|     key = job_id + log_type
 | |
|     if key in LoggerFactory.schedule_logger_dict.keys():
 | |
|         return LoggerFactory.schedule_logger_dict[key]
 | |
|     return get_job_logger(job_id=job_id, log_type=log_type)
 | |
| 
 | |
| 
 | |
| def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
 | |
|     prefix, suffix = base_msg(job, task, role, party_id, detail)
 | |
|     return f"{prefix}{msg} ready{suffix}"
 | |
| 
 | |
| 
 | |
| def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
 | |
|     prefix, suffix = base_msg(job, task, role, party_id, detail)
 | |
|     return f"{prefix}start to {msg}{suffix}"
 | |
| 
 | |
| 
 | |
| def successful_log(msg, job=None, task=None, role=None,
 | |
|                    party_id=None, detail=None):
 | |
|     prefix, suffix = base_msg(job, task, role, party_id, detail)
 | |
|     return f"{prefix}{msg} successfully{suffix}"
 | |
| 
 | |
| 
 | |
| def warning_log(msg, job=None, task=None, role=None,
 | |
|                 party_id=None, detail=None):
 | |
|     prefix, suffix = base_msg(job, task, role, party_id, detail)
 | |
|     return f"{prefix}{msg} is not effective{suffix}"
 | |
| 
 | |
| 
 | |
| def failed_log(msg, job=None, task=None, role=None,
 | |
|                party_id=None, detail=None):
 | |
|     prefix, suffix = base_msg(job, task, role, party_id, detail)
 | |
|     return f"{prefix}failed to {msg}{suffix}"
 | |
| 
 | |
| 
 | |
| def base_msg(job=None, task=None, role: str = None,
 | |
|              party_id: typing.Union[str, int] = None, detail=None):
 | |
|     if detail:
 | |
|         detail_msg = f" detail: \n{detail}"
 | |
|     else:
 | |
|         detail_msg = ""
 | |
|     if task is not None:
 | |
|         return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}"
 | |
|     elif job is not None:
 | |
|         return "", f" on {job.f_role} {job.f_party_id}{detail_msg}"
 | |
|     elif role and party_id:
 | |
|         return "", f" on {role} {party_id}{detail_msg}"
 | |
|     else:
 | |
|         return "", f"{detail_msg}"
 | |
| 
 | |
| 
 | |
| def exception_to_trace_string(ex):
 | |
|     return "".join(traceback.TracebackException.from_exception(ex).format())
 | |
| 
 | |
| 
 | |
| def get_logger_base_dir():
 | |
|     job_log_dir = file_utils.get_rag_flow_directory('logs')
 | |
|     return job_log_dir
 | |
| 
 | |
| 
 | |
| def get_job_logger(job_id, log_type):
 | |
|     rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow')
 | |
|     job_log_dir = file_utils.get_rag_flow_directory('logs', job_id)
 | |
|     if not job_id:
 | |
|         log_dirs = [rag_flow_log_dir]
 | |
|     else:
 | |
|         if log_type == 'audit':
 | |
|             log_dirs = [job_log_dir, rag_flow_log_dir]
 | |
|         else:
 | |
|             log_dirs = [job_log_dir]
 | |
|     if LoggerFactory.log_share:
 | |
|         oldmask = os.umask(000)
 | |
|         os.makedirs(job_log_dir, exist_ok=True)
 | |
|         os.makedirs(rag_flow_log_dir, exist_ok=True)
 | |
|         os.umask(oldmask)
 | |
|     else:
 | |
|         os.makedirs(job_log_dir, exist_ok=True)
 | |
|         os.makedirs(rag_flow_log_dir, exist_ok=True)
 | |
|     logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")
 | |
|     for job_log_dir in log_dirs:
 | |
|         handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,
 | |
|                                             log_dir=job_log_dir, log_type=log_type, job_id=job_id)
 | |
|         error_handler = LoggerFactory.get_handler(
 | |
|             class_name=None,
 | |
|             level=logging.ERROR,
 | |
|             log_dir=job_log_dir,
 | |
|             log_type=log_type,
 | |
|             job_id=job_id)
 | |
|         logger.addHandler(handler)
 | |
|         logger.addHandler(error_handler)
 | |
|     with LoggerFactory.lock:
 | |
|         LoggerFactory.schedule_logger_dict[job_id + log_type] = logger
 | |
|     return logger
 | 
