| 
									
										
										
										
											2024-08-15 09:17:36 +08:00
										 |  |  | # | 
					
						
							|  |  |  | #  Copyright 2024 The InfiniFlow Authors. All Rights Reserved. | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | #  Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | #      http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import typing | 
					
						
							|  |  |  | import traceback | 
					
						
							|  |  |  | import logging | 
					
						
							|  |  |  | import inspect | 
					
						
							|  |  |  | from logging.handlers import TimedRotatingFileHandler | 
					
						
							|  |  |  | from threading import RLock | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from api.utils import file_utils | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class LoggerFactory(object): | 
					
						
							|  |  |  |     TYPE = "FILE" | 
					
						
							|  |  |  |     LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s" | 
					
						
							|  |  |  |     logging.basicConfig(format=LOG_FORMAT) | 
					
						
							|  |  |  |     LEVEL = logging.DEBUG | 
					
						
							|  |  |  |     logger_dict = {} | 
					
						
							|  |  |  |     global_handler_dict = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     LOG_DIR = None | 
					
						
							|  |  |  |     PARENT_LOG_DIR = None | 
					
						
							|  |  |  |     log_share = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     append_to_parent_log = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     lock = RLock() | 
					
						
							|  |  |  |     # CRITICAL = 50 | 
					
						
							|  |  |  |     # FATAL = CRITICAL | 
					
						
							|  |  |  |     # ERROR = 40 | 
					
						
							|  |  |  |     # WARNING = 30 | 
					
						
							|  |  |  |     # WARN = WARNING | 
					
						
							|  |  |  |     # INFO = 20 | 
					
						
							|  |  |  |     # DEBUG = 10 | 
					
						
							|  |  |  |     # NOTSET = 0 | 
					
						
							|  |  |  |     levels = (10, 20, 30, 40) | 
					
						
							|  |  |  |     schedule_logger_dict = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def set_directory(directory=None, parent_log_dir=None, | 
					
						
							|  |  |  |                       append_to_parent_log=None, force=False): | 
					
						
							|  |  |  |         if parent_log_dir: | 
					
						
							|  |  |  |             LoggerFactory.PARENT_LOG_DIR = parent_log_dir | 
					
						
							|  |  |  |         if append_to_parent_log: | 
					
						
							|  |  |  |             LoggerFactory.append_to_parent_log = append_to_parent_log | 
					
						
							|  |  |  |         with LoggerFactory.lock: | 
					
						
							|  |  |  |             if not directory: | 
					
						
							|  |  |  |                 directory = file_utils.get_project_base_directory("logs") | 
					
						
							|  |  |  |             if not LoggerFactory.LOG_DIR or force: | 
					
						
							|  |  |  |                 LoggerFactory.LOG_DIR = directory | 
					
						
							|  |  |  |             if LoggerFactory.log_share: | 
					
						
							|  |  |  |                 oldmask = os.umask(000) | 
					
						
							|  |  |  |                 os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) | 
					
						
							|  |  |  |                 os.umask(oldmask) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) | 
					
						
							|  |  |  |             for loggerName, ghandler in LoggerFactory.global_handler_dict.items(): | 
					
						
							|  |  |  |                 for className, (logger, | 
					
						
							|  |  |  |                                 handler) in LoggerFactory.logger_dict.items(): | 
					
						
							|  |  |  |                     logger.removeHandler(ghandler) | 
					
						
							|  |  |  |                 ghandler.close() | 
					
						
							|  |  |  |             LoggerFactory.global_handler_dict = {} | 
					
						
							|  |  |  |             for className, (logger, | 
					
						
							|  |  |  |                             handler) in LoggerFactory.logger_dict.items(): | 
					
						
							|  |  |  |                 logger.removeHandler(handler) | 
					
						
							|  |  |  |                 _handler = None | 
					
						
							|  |  |  |                 if handler: | 
					
						
							|  |  |  |                     handler.close() | 
					
						
							|  |  |  |                 if className != "default": | 
					
						
							|  |  |  |                     _handler = LoggerFactory.get_handler(className) | 
					
						
							|  |  |  |                     logger.addHandler(_handler) | 
					
						
							|  |  |  |                 LoggerFactory.assemble_global_handler(logger) | 
					
						
							|  |  |  |                 LoggerFactory.logger_dict[className] = logger, _handler | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def new_logger(name): | 
					
						
							|  |  |  |         logger = logging.getLogger(name) | 
					
						
							|  |  |  |         logger.propagate = False | 
					
						
							|  |  |  |         logger.setLevel(LoggerFactory.LEVEL) | 
					
						
							|  |  |  |         return logger | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def get_logger(class_name=None): | 
					
						
							|  |  |  |         with LoggerFactory.lock: | 
					
						
							|  |  |  |             if class_name in LoggerFactory.logger_dict.keys(): | 
					
						
							|  |  |  |                 logger, handler = LoggerFactory.logger_dict[class_name] | 
					
						
							|  |  |  |                 if not logger: | 
					
						
							|  |  |  |                     logger, handler = LoggerFactory.init_logger(class_name) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 logger, handler = LoggerFactory.init_logger(class_name) | 
					
						
							|  |  |  |             return logger | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def get_global_handler(logger_name, level=None, log_dir=None): | 
					
						
							|  |  |  |         if not LoggerFactory.LOG_DIR: | 
					
						
							|  |  |  |             return logging.StreamHandler() | 
					
						
							|  |  |  |         if log_dir: | 
					
						
							|  |  |  |             logger_name_key = logger_name + "_" + log_dir | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR | 
					
						
							|  |  |  |         # if loggerName not in LoggerFactory.globalHandlerDict: | 
					
						
							|  |  |  |         if logger_name_key not in LoggerFactory.global_handler_dict: | 
					
						
							|  |  |  |             with LoggerFactory.lock: | 
					
						
							|  |  |  |                 if logger_name_key not in LoggerFactory.global_handler_dict: | 
					
						
							|  |  |  |                     handler = LoggerFactory.get_handler( | 
					
						
							|  |  |  |                         logger_name, level, log_dir) | 
					
						
							|  |  |  |                     LoggerFactory.global_handler_dict[logger_name_key] = handler | 
					
						
							|  |  |  |         return LoggerFactory.global_handler_dict[logger_name_key] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def get_handler(class_name, level=None, log_dir=None, | 
					
						
							|  |  |  |                     log_type=None, job_id=None): | 
					
						
							|  |  |  |         if not log_type: | 
					
						
							|  |  |  |             if not LoggerFactory.LOG_DIR or not class_name: | 
					
						
							|  |  |  |                 return logging.StreamHandler() | 
					
						
							|  |  |  |                 # return Diy_StreamHandler() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if not log_dir: | 
					
						
							|  |  |  |                 log_file = os.path.join( | 
					
						
							|  |  |  |                     LoggerFactory.LOG_DIR, | 
					
						
							|  |  |  |                     "{}.log".format(class_name)) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 log_file = os.path.join(log_dir, "{}.log".format(class_name)) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             log_file = os.path.join(log_dir, "rag_flow_{}.log".format( | 
					
						
							|  |  |  |                 log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         os.makedirs(os.path.dirname(log_file), exist_ok=True) | 
					
						
							|  |  |  |         if LoggerFactory.log_share: | 
					
						
							|  |  |  |             handler = ROpenHandler(log_file, | 
					
						
							|  |  |  |                                    when='D', | 
					
						
							|  |  |  |                                    interval=1, | 
					
						
							|  |  |  |                                    backupCount=14, | 
					
						
							|  |  |  |                                    delay=True) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             handler = TimedRotatingFileHandler(log_file, | 
					
						
							|  |  |  |                                                when='D', | 
					
						
							|  |  |  |                                                interval=1, | 
					
						
							|  |  |  |                                                backupCount=14, | 
					
						
							|  |  |  |                                                delay=True) | 
					
						
							|  |  |  |         if level: | 
					
						
							|  |  |  |             handler.level = level | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return handler | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def init_logger(class_name): | 
					
						
							|  |  |  |         with LoggerFactory.lock: | 
					
						
							|  |  |  |             logger = LoggerFactory.new_logger(class_name) | 
					
						
							|  |  |  |             handler = None | 
					
						
							|  |  |  |             if class_name: | 
					
						
							|  |  |  |                 handler = LoggerFactory.get_handler(class_name) | 
					
						
							|  |  |  |                 logger.addHandler(handler) | 
					
						
							|  |  |  |                 LoggerFactory.logger_dict[class_name] = logger, handler | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 LoggerFactory.logger_dict["default"] = logger, handler | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             LoggerFactory.assemble_global_handler(logger) | 
					
						
							|  |  |  |             return logger, handler | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def assemble_global_handler(logger): | 
					
						
							|  |  |  |         if LoggerFactory.LOG_DIR: | 
					
						
							|  |  |  |             for level in LoggerFactory.levels: | 
					
						
							|  |  |  |                 if level >= LoggerFactory.LEVEL: | 
					
						
							|  |  |  |                     level_logger_name = logging._levelToName[level] | 
					
						
							|  |  |  |                     logger.addHandler( | 
					
						
							|  |  |  |                         LoggerFactory.get_global_handler( | 
					
						
							|  |  |  |                             level_logger_name, level)) | 
					
						
							|  |  |  |         if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR: | 
					
						
							|  |  |  |             for level in LoggerFactory.levels: | 
					
						
							|  |  |  |                 if level >= LoggerFactory.LEVEL: | 
					
						
							|  |  |  |                     level_logger_name = logging._levelToName[level] | 
					
						
							|  |  |  |                     logger.addHandler( | 
					
						
							|  |  |  |                         LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def setDirectory(directory=None): | 
					
						
							|  |  |  |     LoggerFactory.set_directory(directory) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def setLevel(level): | 
					
						
							|  |  |  |     LoggerFactory.LEVEL = level | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def getLogger(className=None, useLevelFile=False): | 
					
						
							|  |  |  |     if className is None: | 
					
						
							|  |  |  |         frame = inspect.stack()[1] | 
					
						
							|  |  |  |         module = inspect.getmodule(frame[0]) | 
					
						
							|  |  |  |         className = 'stat' | 
					
						
							|  |  |  |     return LoggerFactory.get_logger(className) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def exception_to_trace_string(ex): | 
					
						
							|  |  |  |     return "".join(traceback.TracebackException.from_exception(ex).format()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ROpenHandler(TimedRotatingFileHandler): | 
					
						
							|  |  |  |     def _open(self): | 
					
						
							|  |  |  |         prevumask = os.umask(000) | 
					
						
							|  |  |  |         rtv = TimedRotatingFileHandler._open(self) | 
					
						
							|  |  |  |         os.umask(prevumask) | 
					
						
							|  |  |  |         return rtv | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def sql_logger(job_id='', log_type='sql'): | 
					
						
							|  |  |  |     key = job_id + log_type | 
					
						
							|  |  |  |     if key in LoggerFactory.schedule_logger_dict.keys(): | 
					
						
							|  |  |  |         return LoggerFactory.schedule_logger_dict[key] | 
					
						
							|  |  |  |     return get_job_logger(job_id=job_id, log_type=log_type) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None): | 
					
						
							|  |  |  |     prefix, suffix = base_msg(job, task, role, party_id, detail) | 
					
						
							|  |  |  |     return f"{prefix}{msg} ready{suffix}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None): | 
					
						
							|  |  |  |     prefix, suffix = base_msg(job, task, role, party_id, detail) | 
					
						
							|  |  |  |     return f"{prefix}start to {msg}{suffix}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def successful_log(msg, job=None, task=None, role=None, | 
					
						
							|  |  |  |                    party_id=None, detail=None): | 
					
						
							|  |  |  |     prefix, suffix = base_msg(job, task, role, party_id, detail) | 
					
						
							|  |  |  |     return f"{prefix}{msg} successfully{suffix}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def warning_log(msg, job=None, task=None, role=None, | 
					
						
							|  |  |  |                 party_id=None, detail=None): | 
					
						
							|  |  |  |     prefix, suffix = base_msg(job, task, role, party_id, detail) | 
					
						
							|  |  |  |     return f"{prefix}{msg} is not effective{suffix}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def failed_log(msg, job=None, task=None, role=None, | 
					
						
							|  |  |  |                party_id=None, detail=None): | 
					
						
							|  |  |  |     prefix, suffix = base_msg(job, task, role, party_id, detail) | 
					
						
							|  |  |  |     return f"{prefix}failed to {msg}{suffix}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def base_msg(job=None, task=None, role: str = None, | 
					
						
							|  |  |  |              party_id: typing.Union[str, int] = None, detail=None): | 
					
						
							|  |  |  |     if detail: | 
					
						
							|  |  |  |         detail_msg = f" detail: \n{detail}" | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         detail_msg = "" | 
					
						
							|  |  |  |     if task is not None: | 
					
						
							|  |  |  |         return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}" | 
					
						
							|  |  |  |     elif job is not None: | 
					
						
							|  |  |  |         return "", f" on {job.f_role} {job.f_party_id}{detail_msg}" | 
					
						
							|  |  |  |     elif role and party_id: | 
					
						
							|  |  |  |         return "", f" on {role} {party_id}{detail_msg}" | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         return "", f"{detail_msg}" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def exception_to_trace_string(ex): | 
					
						
							|  |  |  |     return "".join(traceback.TracebackException.from_exception(ex).format()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_logger_base_dir(): | 
					
						
							|  |  |  |     job_log_dir = file_utils.get_rag_flow_directory('logs') | 
					
						
							|  |  |  |     return job_log_dir | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_job_logger(job_id, log_type): | 
					
						
							|  |  |  |     rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow') | 
					
						
							|  |  |  |     job_log_dir = file_utils.get_rag_flow_directory('logs', job_id) | 
					
						
							|  |  |  |     if not job_id: | 
					
						
							|  |  |  |         log_dirs = [rag_flow_log_dir] | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         if log_type == 'audit': | 
					
						
							|  |  |  |             log_dirs = [job_log_dir, rag_flow_log_dir] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             log_dirs = [job_log_dir] | 
					
						
							|  |  |  |     if LoggerFactory.log_share: | 
					
						
							|  |  |  |         oldmask = os.umask(000) | 
					
						
							|  |  |  |         os.makedirs(job_log_dir, exist_ok=True) | 
					
						
							|  |  |  |         os.makedirs(rag_flow_log_dir, exist_ok=True) | 
					
						
							|  |  |  |         os.umask(oldmask) | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         os.makedirs(job_log_dir, exist_ok=True) | 
					
						
							|  |  |  |         os.makedirs(rag_flow_log_dir, exist_ok=True) | 
					
						
							|  |  |  |     logger = LoggerFactory.new_logger(f"{job_id}_{log_type}") | 
					
						
							|  |  |  |     for job_log_dir in log_dirs: | 
					
						
							|  |  |  |         handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL, | 
					
						
							|  |  |  |                                             log_dir=job_log_dir, log_type=log_type, job_id=job_id) | 
					
						
							|  |  |  |         error_handler = LoggerFactory.get_handler( | 
					
						
							|  |  |  |             class_name=None, | 
					
						
							|  |  |  |             level=logging.ERROR, | 
					
						
							|  |  |  |             log_dir=job_log_dir, | 
					
						
							|  |  |  |             log_type=log_type, | 
					
						
							|  |  |  |             job_id=job_id) | 
					
						
							|  |  |  |         logger.addHandler(handler) | 
					
						
							|  |  |  |         logger.addHandler(error_handler) | 
					
						
							|  |  |  |     with LoggerFactory.lock: | 
					
						
							|  |  |  |         LoggerFactory.schedule_logger_dict[job_id + log_type] = logger | 
					
						
							|  |  |  |     return logger |