mirror of
				https://github.com/langgenius/dify.git
				synced 2025-10-31 10:53:02 +00:00 
			
		
		
		
	 0a20210a59
			
		
	
	
		0a20210a59
		
			
		
	
	
	
	
		
			
			Signed-off-by: Yuichiro Utsumi <utsumi.yuichiro@fujitsu.com> Signed-off-by: -LAN- <laipz8200@outlook.com> Signed-off-by: yihong0618 <zouzou0208@gmail.com> Signed-off-by: kenwoodjw <blackxin55+@gmail.com> Signed-off-by: ChengZi <chen.zhang@zilliz.com> Signed-off-by: cl <cailue@apache.org> Co-authored-by: Yu Chun Chang <changyuchun159630@gmail.com> Co-authored-by: Kyle Chang <kylechang@91app.com> Co-authored-by: Lick-liu <51771897+Lick-liu@users.noreply.github.com> Co-authored-by: crazywoola <427733928@qq.com> Co-authored-by: Yuichiro Utsumi <81412151+utsumi-fj@users.noreply.github.com> Co-authored-by: NFish <douxc512@gmail.com> Co-authored-by: Yeuoly <45712896+Yeuoly@users.noreply.github.com> Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com> Co-authored-by: DDDDD12138 <43703884+DDDDD12138@users.noreply.github.com> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: Novice <857526207@qq.com> Co-authored-by: yihong <zouzou0208@gmail.com> Co-authored-by: Kalo Chin <91766386+fdb02983rhy@users.noreply.github.com> Co-authored-by: zxhlyh <jasonapring2015@outlook.com> Co-authored-by: jiangbo721 <365065261@qq.com> Co-authored-by: 刘江波 <jiangbo721@163.com> Co-authored-by: Lam <scau_ljw@126.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: Mars <524574386@qq.com> Co-authored-by: mars <linjx2@by-health.com> Co-authored-by: Joe <79627742+ZhouhaoJiang@users.noreply.github.com> Co-authored-by: Rafael Carvalho <r.carvalho@me.com> Co-authored-by: Joel <iamjoel007@gmail.com> Co-authored-by: 非法操作 <hjlarry@163.com> Co-authored-by: kenwoodjw <blackxin55+@gmail.com> Co-authored-by: codingjaguar <codingjaguar@gmail.com> Co-authored-by: ChengZi <chen.zhang@zilliz.com> Co-authored-by: Fei He <droxer.he@gmail.com> Co-authored-by: Arcaner <52057416+lrhan321@users.noreply.github.com> Co-authored-by: Xiyuan Chen <52963600+GareArc@users.noreply.github.com> Co-authored-by: KVOJJJin <jzongcode@gmail.com> Co-authored-by: XiaoBa <94062266+XiaoBa-Yu@users.noreply.github.com> Co-authored-by: Xiaoba Yu <xb1823725853@gmail.com> Co-authored-by: zhangyuhang <2827528315@qq.com> Co-authored-by: yuhang2.zhang <yuhang2.zhang@ly.com> Co-authored-by: 诗浓 <nyaashino@gmail.com> Co-authored-by: RookieAgent <42060616+Sakura4036@users.noreply.github.com> Co-authored-by: sho-takano-dev <shota.takano.dev@gmail.com> Co-authored-by: 過世秋風 <1040926235@qq.com> Co-authored-by: Yi Feng <66539215+bigyifeng@users.noreply.github.com> Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com> Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com> Co-authored-by: ShadowJobs <794878115@qq.com> Co-authored-by: LinYing <linying@momenta.ai> Co-authored-by: Benjamin <benjaminx@gmail.com> Co-authored-by: LiuBodong <liubodong2010@126.com> Co-authored-by: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com> Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com> Co-authored-by: csurong <csurong1@gmail.com> Co-authored-by: 傻笑zz <43721571+shaxiaozz@users.noreply.github.com> Co-authored-by: L8ng <straydragonl@foxmail.com> Co-authored-by: Bowen Liang <liangbowen@gf.com.cn> Co-authored-by: Novice Lee <novicelee@NoviPro.local> Co-authored-by: GuanMu <ballmanjq@gmail.com> Co-authored-by: LittleFish-15 <58618983+LittleFish-15@users.noreply.github.com> Co-authored-by: 诗浓 <844670992@qq.com> Co-authored-by: luckylhb90 <luckylhb90@gmail.com> Co-authored-by: hobo.l <hobo.l@binance.com> Co-authored-by: Gen Sato <52241300+halogen22@users.noreply.github.com> Co-authored-by: twwu <twwu@dify.ai> Co-authored-by: StoneFancyX <53338920+StoneFancyX@users.noreply.github.com> Co-authored-by: StoneFancyX <kindbin@qq.com> Co-authored-by: Naoki KOBAYASHI <naotama@gmail.com> Co-authored-by: kurokobo <kuro664@gmail.com> Co-authored-by: cyflhn <cyflhn@163.com> Co-authored-by: Yingchun Lai <laiyingchun@apache.org> Co-authored-by: jimmyfen <757343258@qq.com> Co-authored-by: Xuetao Song <xuetaomagicsong@gmail.com> Co-authored-by: Panpan <wurui.dev@gmail.com> Co-authored-by: wyy-holding <59436937+wyy-holding@users.noreply.github.com> Co-authored-by: リイノ Lin <sorphwer@gmail.com> Co-authored-by: Ning <accelerator314@gmail.com> Co-authored-by: Linh Nguyen <55907715+batman0911@users.noreply.github.com> Co-authored-by: Junjie.M <118170653@qq.com> Co-authored-by: Ron <svcvit@gmail.com> Co-authored-by: Novice <novice12185727@gmail.com> Co-authored-by: NanoNova <kid1412621@gmail.com> Co-authored-by: JaydenZhou <380774082@qq.com> Co-authored-by: dotdotdot <823150982@qq.com> Co-authored-by: Good Wood <slm_1990@126.com> Co-authored-by: Ryosei Karaki <38310693+karamaru-alpha@users.noreply.github.com> Co-authored-by: chenhuan0728 <54611342+chenhuan0728@users.noreply.github.com> Co-authored-by: chenhuan <huan.chen0728@foxmail> Co-authored-by: lenbo <islenbo@qq.com> Co-authored-by: Jiang <65766008+AlwaysBluer@users.noreply.github.com> Co-authored-by: jiangzhijie <jiangzhijie.jzj@alibaba-inc.com> Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com> Co-authored-by: zhangkun-21 <sephiroth0932@gmail.com> Co-authored-by: hsiong <37357447+hsiong@users.noreply.github.com> Co-authored-by: 李远军 <4842@9ji.com> Co-authored-by: yourchanges <yourchanges@gmail.com> Co-authored-by: David <guyuezhuying@126.com> Co-authored-by: liuzhenghua <1090179900@qq.com> Co-authored-by: taokuizu <taokuizu@qq.com> Co-authored-by: Hanqing Zhao <sherry9277@gmail.com> Co-authored-by: JimintheBox <gjwlals111@gmail.com> Co-authored-by: wlleiiwang <1025164922@qq.com> Co-authored-by: wlleiiwang <wlleiiwang@tencent.com> Co-authored-by: Alex <32982705+AlexYuan997@users.noreply.github.com> Co-authored-by: yuanlong <yuanlong@boco.com.cn> Co-authored-by: wanttobeamaster <45583625+wanttobeamaster@users.noreply.github.com> Co-authored-by: xiaozhiqing.xzq <xiaozhiqing.xzq@alibaba-inc.com> Co-authored-by: Chenhe Gu <guchenhe@gmail.com> Co-authored-by: tyounami <vkbo@qq.com> Co-authored-by: bo.zhao <bo.zhao@iglooinsure.com> Co-authored-by: ClSlaid <cailue@apache.org> Co-authored-by: adru <106513264+adpanru@users.noreply.github.com> Co-authored-by: horochx <32632779+horochx@users.noreply.github.com>
		
			
				
	
	
		
			834 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			834 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import logging
 | |
| import os
 | |
| import queue
 | |
| import threading
 | |
| import time
 | |
| from datetime import timedelta
 | |
| from typing import Any, Optional, Union
 | |
| from uuid import UUID, uuid4
 | |
| 
 | |
| from cachetools import LRUCache
 | |
| from flask import current_app
 | |
| from sqlalchemy import select
 | |
| from sqlalchemy.orm import Session
 | |
| 
 | |
| from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
 | |
| from core.ops.entities.config_entity import (
 | |
|     OPS_FILE_PATH,
 | |
|     LangfuseConfig,
 | |
|     LangSmithConfig,
 | |
|     OpikConfig,
 | |
|     TracingProviderEnum,
 | |
|     WeaveConfig,
 | |
| )
 | |
| from core.ops.entities.trace_entity import (
 | |
|     DatasetRetrievalTraceInfo,
 | |
|     GenerateNameTraceInfo,
 | |
|     MessageTraceInfo,
 | |
|     ModerationTraceInfo,
 | |
|     SuggestedQuestionTraceInfo,
 | |
|     TaskData,
 | |
|     ToolTraceInfo,
 | |
|     TraceTaskName,
 | |
|     WorkflowTraceInfo,
 | |
| )
 | |
| from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace
 | |
| from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace
 | |
| from core.ops.opik_trace.opik_trace import OpikDataTrace
 | |
| from core.ops.utils import get_message_data
 | |
| from core.ops.weave_trace.weave_trace import WeaveDataTrace
 | |
| from extensions.ext_database import db
 | |
| from extensions.ext_storage import storage
 | |
| from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
 | |
| from models.workflow import WorkflowAppLog, WorkflowRun
 | |
| from tasks.ops_trace_task import process_trace_tasks
 | |
| 
 | |
| 
 | |
| def build_opik_trace_instance(config: OpikConfig):
 | |
|     return OpikDataTrace(config)
 | |
| 
 | |
| 
 | |
| provider_config_map: dict[str, dict[str, Any]] = {
 | |
|     TracingProviderEnum.LANGFUSE.value: {
 | |
|         "config_class": LangfuseConfig,
 | |
|         "secret_keys": ["public_key", "secret_key"],
 | |
|         "other_keys": ["host", "project_key"],
 | |
|         "trace_instance": LangFuseDataTrace,
 | |
|     },
 | |
|     TracingProviderEnum.LANGSMITH.value: {
 | |
|         "config_class": LangSmithConfig,
 | |
|         "secret_keys": ["api_key"],
 | |
|         "other_keys": ["project", "endpoint"],
 | |
|         "trace_instance": LangSmithDataTrace,
 | |
|     },
 | |
|     TracingProviderEnum.OPIK.value: {
 | |
|         "config_class": OpikConfig,
 | |
|         "secret_keys": ["api_key"],
 | |
|         "other_keys": ["project", "url", "workspace"],
 | |
|         "trace_instance": lambda config: build_opik_trace_instance(config),
 | |
|     },
 | |
|     TracingProviderEnum.WEAVE.value: {
 | |
|         "config_class": WeaveConfig,
 | |
|         "secret_keys": ["api_key"],
 | |
|         "other_keys": ["project", "entity", "endpoint"],
 | |
|         "trace_instance": WeaveDataTrace,
 | |
|     },
 | |
| }
 | |
| 
 | |
| 
 | |
| class OpsTraceManager:
 | |
|     ops_trace_instances_cache: LRUCache = LRUCache(maxsize=128)
 | |
| 
 | |
|     @classmethod
 | |
|     def encrypt_tracing_config(
 | |
|         cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None
 | |
|     ):
 | |
|         """
 | |
|         Encrypt tracing config.
 | |
|         :param tenant_id: tenant id
 | |
|         :param tracing_provider: tracing provider
 | |
|         :param tracing_config: tracing config dictionary to be encrypted
 | |
|         :param current_trace_config: current tracing configuration for keeping existing values
 | |
|         :return: encrypted tracing configuration
 | |
|         """
 | |
|         # Get the configuration class and the keys that require encryption
 | |
|         config_class, secret_keys, other_keys = (
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|             provider_config_map[tracing_provider]["secret_keys"],
 | |
|             provider_config_map[tracing_provider]["other_keys"],
 | |
|         )
 | |
| 
 | |
|         new_config = {}
 | |
|         # Encrypt necessary keys
 | |
|         for key in secret_keys:
 | |
|             if key in tracing_config:
 | |
|                 if "*" in tracing_config[key]:
 | |
|                     # If the key contains '*', retain the original value from the current config
 | |
|                     new_config[key] = current_trace_config.get(key, tracing_config[key])
 | |
|                 else:
 | |
|                     # Otherwise, encrypt the key
 | |
|                     new_config[key] = encrypt_token(tenant_id, tracing_config[key])
 | |
| 
 | |
|         for key in other_keys:
 | |
|             new_config[key] = tracing_config.get(key, "")
 | |
| 
 | |
|         # Create a new instance of the config class with the new configuration
 | |
|         encrypted_config = config_class(**new_config)
 | |
|         return encrypted_config.model_dump()
 | |
| 
 | |
|     @classmethod
 | |
|     def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict):
 | |
|         """
 | |
|         Decrypt tracing config
 | |
|         :param tenant_id: tenant id
 | |
|         :param tracing_provider: tracing provider
 | |
|         :param tracing_config: tracing config
 | |
|         :return:
 | |
|         """
 | |
|         config_class, secret_keys, other_keys = (
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|             provider_config_map[tracing_provider]["secret_keys"],
 | |
|             provider_config_map[tracing_provider]["other_keys"],
 | |
|         )
 | |
|         new_config = {}
 | |
|         for key in secret_keys:
 | |
|             if key in tracing_config:
 | |
|                 new_config[key] = decrypt_token(tenant_id, tracing_config[key])
 | |
| 
 | |
|         for key in other_keys:
 | |
|             new_config[key] = tracing_config.get(key, "")
 | |
| 
 | |
|         return config_class(**new_config).model_dump()
 | |
| 
 | |
|     @classmethod
 | |
|     def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
 | |
|         """
 | |
|         Decrypt tracing config
 | |
|         :param tracing_provider: tracing provider
 | |
|         :param decrypt_tracing_config: tracing config
 | |
|         :return:
 | |
|         """
 | |
|         config_class, secret_keys, other_keys = (
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|             provider_config_map[tracing_provider]["secret_keys"],
 | |
|             provider_config_map[tracing_provider]["other_keys"],
 | |
|         )
 | |
|         new_config = {}
 | |
|         for key in secret_keys:
 | |
|             if key in decrypt_tracing_config:
 | |
|                 new_config[key] = obfuscated_token(decrypt_tracing_config[key])
 | |
| 
 | |
|         for key in other_keys:
 | |
|             new_config[key] = decrypt_tracing_config.get(key, "")
 | |
|         return config_class(**new_config).model_dump()
 | |
| 
 | |
|     @classmethod
 | |
|     def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str):
 | |
|         """
 | |
|         Get decrypted tracing config
 | |
|         :param app_id: app id
 | |
|         :param tracing_provider: tracing provider
 | |
|         :return:
 | |
|         """
 | |
|         trace_config_data: Optional[TraceAppConfig] = (
 | |
|             db.session.query(TraceAppConfig)
 | |
|             .filter(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider)
 | |
|             .first()
 | |
|         )
 | |
| 
 | |
|         if not trace_config_data:
 | |
|             return None
 | |
| 
 | |
|         # decrypt_token
 | |
|         app = db.session.query(App).filter(App.id == app_id).first()
 | |
|         if not app:
 | |
|             raise ValueError("App not found")
 | |
| 
 | |
|         tenant_id = app.tenant_id
 | |
|         decrypt_tracing_config = cls.decrypt_tracing_config(
 | |
|             tenant_id, tracing_provider, trace_config_data.tracing_config
 | |
|         )
 | |
| 
 | |
|         return decrypt_tracing_config
 | |
| 
 | |
|     @classmethod
 | |
|     def get_ops_trace_instance(
 | |
|         cls,
 | |
|         app_id: Optional[Union[UUID, str]] = None,
 | |
|     ):
 | |
|         """
 | |
|         Get ops trace through model config
 | |
|         :param app_id: app_id
 | |
|         :return:
 | |
|         """
 | |
|         if isinstance(app_id, UUID):
 | |
|             app_id = str(app_id)
 | |
| 
 | |
|         if app_id is None:
 | |
|             return None
 | |
| 
 | |
|         app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
 | |
| 
 | |
|         if app is None:
 | |
|             return None
 | |
| 
 | |
|         app_ops_trace_config = json.loads(app.tracing) if app.tracing else None
 | |
|         if app_ops_trace_config is None:
 | |
|             return None
 | |
|         if not app_ops_trace_config.get("enabled"):
 | |
|             return None
 | |
| 
 | |
|         tracing_provider = app_ops_trace_config.get("tracing_provider")
 | |
|         if tracing_provider is None or tracing_provider not in provider_config_map:
 | |
|             return None
 | |
| 
 | |
|         # decrypt_token
 | |
|         decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider)
 | |
|         if not decrypt_trace_config:
 | |
|             return None
 | |
| 
 | |
|         trace_instance, config_class = (
 | |
|             provider_config_map[tracing_provider]["trace_instance"],
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|         )
 | |
|         decrypt_trace_config_key = str(decrypt_trace_config)
 | |
|         tracing_instance = cls.ops_trace_instances_cache.get(decrypt_trace_config_key)
 | |
|         if tracing_instance is None:
 | |
|             # create new tracing_instance and update the cache if it absent
 | |
|             tracing_instance = trace_instance(config_class(**decrypt_trace_config))
 | |
|             cls.ops_trace_instances_cache[decrypt_trace_config_key] = tracing_instance
 | |
|             logging.info(f"new tracing_instance for app_id: {app_id}")
 | |
|         return tracing_instance
 | |
| 
 | |
|     @classmethod
 | |
|     def get_app_config_through_message_id(cls, message_id: str):
 | |
|         app_model_config = None
 | |
|         message_data = db.session.query(Message).filter(Message.id == message_id).first()
 | |
|         if not message_data:
 | |
|             return None
 | |
|         conversation_id = message_data.conversation_id
 | |
|         conversation_data = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
 | |
|         if not conversation_data:
 | |
|             return None
 | |
| 
 | |
|         if conversation_data.app_model_config_id:
 | |
|             app_model_config = (
 | |
|                 db.session.query(AppModelConfig)
 | |
|                 .filter(AppModelConfig.id == conversation_data.app_model_config_id)
 | |
|                 .first()
 | |
|             )
 | |
|         elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs:
 | |
|             app_model_config = conversation_data.override_model_configs
 | |
| 
 | |
|         return app_model_config
 | |
| 
 | |
|     @classmethod
 | |
|     def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str):
 | |
|         """
 | |
|         Update app tracing config
 | |
|         :param app_id: app id
 | |
|         :param enabled: enabled
 | |
|         :param tracing_provider: tracing provider
 | |
|         :return:
 | |
|         """
 | |
|         # auth check
 | |
|         if tracing_provider not in provider_config_map and tracing_provider is not None:
 | |
|             raise ValueError(f"Invalid tracing provider: {tracing_provider}")
 | |
| 
 | |
|         app_config: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
 | |
|         if not app_config:
 | |
|             raise ValueError("App not found")
 | |
|         app_config.tracing = json.dumps(
 | |
|             {
 | |
|                 "enabled": enabled,
 | |
|                 "tracing_provider": tracing_provider,
 | |
|             }
 | |
|         )
 | |
|         db.session.commit()
 | |
| 
 | |
|     @classmethod
 | |
|     def get_app_tracing_config(cls, app_id: str):
 | |
|         """
 | |
|         Get app tracing config
 | |
|         :param app_id: app id
 | |
|         :return:
 | |
|         """
 | |
|         app: Optional[App] = db.session.query(App).filter(App.id == app_id).first()
 | |
|         if not app:
 | |
|             raise ValueError("App not found")
 | |
|         if not app.tracing:
 | |
|             return {"enabled": False, "tracing_provider": None}
 | |
|         app_trace_config = json.loads(app.tracing)
 | |
|         return app_trace_config
 | |
| 
 | |
|     @staticmethod
 | |
|     def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str):
 | |
|         """
 | |
|         Check trace config is effective
 | |
|         :param tracing_config: tracing config
 | |
|         :param tracing_provider: tracing provider
 | |
|         :return:
 | |
|         """
 | |
|         config_type, trace_instance = (
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|             provider_config_map[tracing_provider]["trace_instance"],
 | |
|         )
 | |
|         tracing_config = config_type(**tracing_config)
 | |
|         return trace_instance(tracing_config).api_check()
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_trace_config_project_key(tracing_config: dict, tracing_provider: str):
 | |
|         """
 | |
|         get trace config is project key
 | |
|         :param tracing_config: tracing config
 | |
|         :param tracing_provider: tracing provider
 | |
|         :return:
 | |
|         """
 | |
|         config_type, trace_instance = (
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|             provider_config_map[tracing_provider]["trace_instance"],
 | |
|         )
 | |
|         tracing_config = config_type(**tracing_config)
 | |
|         return trace_instance(tracing_config).get_project_key()
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_trace_config_project_url(tracing_config: dict, tracing_provider: str):
 | |
|         """
 | |
|         get trace config is project key
 | |
|         :param tracing_config: tracing config
 | |
|         :param tracing_provider: tracing provider
 | |
|         :return:
 | |
|         """
 | |
|         config_type, trace_instance = (
 | |
|             provider_config_map[tracing_provider]["config_class"],
 | |
|             provider_config_map[tracing_provider]["trace_instance"],
 | |
|         )
 | |
|         tracing_config = config_type(**tracing_config)
 | |
|         return trace_instance(tracing_config).get_project_url()
 | |
| 
 | |
| 
 | |
| class TraceTask:
 | |
|     def __init__(
 | |
|         self,
 | |
|         trace_type: Any,
 | |
|         message_id: Optional[str] = None,
 | |
|         workflow_run: Optional[WorkflowRun] = None,
 | |
|         conversation_id: Optional[str] = None,
 | |
|         user_id: Optional[str] = None,
 | |
|         timer: Optional[Any] = None,
 | |
|         **kwargs,
 | |
|     ):
 | |
|         self.trace_type = trace_type
 | |
|         self.message_id = message_id
 | |
|         self.workflow_run_id = workflow_run.id if workflow_run else None
 | |
|         self.conversation_id = conversation_id
 | |
|         self.user_id = user_id
 | |
|         self.timer = timer
 | |
|         self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
 | |
|         self.app_id = None
 | |
| 
 | |
|         self.kwargs = kwargs
 | |
| 
 | |
|     def execute(self):
 | |
|         return self.preprocess()
 | |
| 
 | |
|     def preprocess(self):
 | |
|         preprocess_map = {
 | |
|             TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs),
 | |
|             TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
 | |
|                 workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id
 | |
|             ),
 | |
|             TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id),
 | |
|             TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(
 | |
|                 message_id=self.message_id, timer=self.timer, **self.kwargs
 | |
|             ),
 | |
|             TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace(
 | |
|                 message_id=self.message_id, timer=self.timer, **self.kwargs
 | |
|             ),
 | |
|             TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace(
 | |
|                 message_id=self.message_id, timer=self.timer, **self.kwargs
 | |
|             ),
 | |
|             TraceTaskName.TOOL_TRACE: lambda: self.tool_trace(
 | |
|                 message_id=self.message_id, timer=self.timer, **self.kwargs
 | |
|             ),
 | |
|             TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
 | |
|                 conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
 | |
|             ),
 | |
|         }
 | |
| 
 | |
|         return preprocess_map.get(self.trace_type, lambda: None)()
 | |
| 
 | |
|     # process methods for different trace types
 | |
|     def conversation_trace(self, **kwargs):
 | |
|         return kwargs
 | |
| 
 | |
|     def workflow_trace(
 | |
|         self,
 | |
|         *,
 | |
|         workflow_run_id: str | None,
 | |
|         conversation_id: str | None,
 | |
|         user_id: str | None,
 | |
|     ):
 | |
|         if not workflow_run_id:
 | |
|             return {}
 | |
| 
 | |
|         with Session(db.engine) as session:
 | |
|             workflow_run_stmt = select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)
 | |
|             workflow_run = session.scalars(workflow_run_stmt).first()
 | |
|             if not workflow_run:
 | |
|                 raise ValueError("Workflow run not found")
 | |
| 
 | |
|             workflow_id = workflow_run.workflow_id
 | |
|             tenant_id = workflow_run.tenant_id
 | |
|             workflow_run_id = workflow_run.id
 | |
|             workflow_run_elapsed_time = workflow_run.elapsed_time
 | |
|             workflow_run_status = workflow_run.status
 | |
|             workflow_run_inputs = workflow_run.inputs_dict
 | |
|             workflow_run_outputs = workflow_run.outputs_dict
 | |
|             workflow_run_version = workflow_run.version
 | |
|             error = workflow_run.error or ""
 | |
| 
 | |
|             total_tokens = workflow_run.total_tokens
 | |
| 
 | |
|             file_list = workflow_run_inputs.get("sys.file") or []
 | |
|             query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
 | |
| 
 | |
|             # get workflow_app_log_id
 | |
|             workflow_app_log_data_stmt = select(WorkflowAppLog.id).where(
 | |
|                 WorkflowAppLog.tenant_id == tenant_id,
 | |
|                 WorkflowAppLog.app_id == workflow_run.app_id,
 | |
|                 WorkflowAppLog.workflow_run_id == workflow_run.id,
 | |
|             )
 | |
|             workflow_app_log_id = session.scalar(workflow_app_log_data_stmt)
 | |
|             # get message_id
 | |
|             message_id = None
 | |
|             if conversation_id:
 | |
|                 message_data_stmt = select(Message.id).where(
 | |
|                     Message.conversation_id == conversation_id,
 | |
|                     Message.workflow_run_id == workflow_run_id,
 | |
|                 )
 | |
|                 message_id = session.scalar(message_data_stmt)
 | |
| 
 | |
|             metadata = {
 | |
|                 "workflow_id": workflow_id,
 | |
|                 "conversation_id": conversation_id,
 | |
|                 "workflow_run_id": workflow_run_id,
 | |
|                 "tenant_id": tenant_id,
 | |
|                 "elapsed_time": workflow_run_elapsed_time,
 | |
|                 "status": workflow_run_status,
 | |
|                 "version": workflow_run_version,
 | |
|                 "total_tokens": total_tokens,
 | |
|                 "file_list": file_list,
 | |
|                 "triggered_from": workflow_run.triggered_from,
 | |
|                 "user_id": user_id,
 | |
|             }
 | |
| 
 | |
|             workflow_trace_info = WorkflowTraceInfo(
 | |
|                 workflow_data=workflow_run.to_dict(),
 | |
|                 conversation_id=conversation_id,
 | |
|                 workflow_id=workflow_id,
 | |
|                 tenant_id=tenant_id,
 | |
|                 workflow_run_id=workflow_run_id,
 | |
|                 workflow_run_elapsed_time=workflow_run_elapsed_time,
 | |
|                 workflow_run_status=workflow_run_status,
 | |
|                 workflow_run_inputs=workflow_run_inputs,
 | |
|                 workflow_run_outputs=workflow_run_outputs,
 | |
|                 workflow_run_version=workflow_run_version,
 | |
|                 error=error,
 | |
|                 total_tokens=total_tokens,
 | |
|                 file_list=file_list,
 | |
|                 query=query,
 | |
|                 metadata=metadata,
 | |
|                 workflow_app_log_id=workflow_app_log_id,
 | |
|                 message_id=message_id,
 | |
|                 start_time=workflow_run.created_at,
 | |
|                 end_time=workflow_run.finished_at,
 | |
|             )
 | |
|         return workflow_trace_info
 | |
| 
 | |
|     def message_trace(self, message_id: str | None):
 | |
|         if not message_id:
 | |
|             return {}
 | |
|         message_data = get_message_data(message_id)
 | |
|         if not message_data:
 | |
|             return {}
 | |
|         conversation_mode_stmt = select(Conversation.mode).where(Conversation.id == message_data.conversation_id)
 | |
|         conversation_mode = db.session.scalars(conversation_mode_stmt).all()
 | |
|         if not conversation_mode or len(conversation_mode) == 0:
 | |
|             return {}
 | |
|         conversation_mode = conversation_mode[0]
 | |
|         created_at = message_data.created_at
 | |
|         inputs = message_data.message
 | |
| 
 | |
|         # get message file data
 | |
|         message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
 | |
|         file_list = []
 | |
|         if message_file_data and message_file_data.url is not None:
 | |
|             file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
 | |
|             file_list.append(file_url)
 | |
| 
 | |
|         metadata = {
 | |
|             "conversation_id": message_data.conversation_id,
 | |
|             "ls_provider": message_data.model_provider,
 | |
|             "ls_model_name": message_data.model_id,
 | |
|             "status": message_data.status,
 | |
|             "from_end_user_id": message_data.from_end_user_id,
 | |
|             "from_account_id": message_data.from_account_id,
 | |
|             "agent_based": message_data.agent_based,
 | |
|             "workflow_run_id": message_data.workflow_run_id,
 | |
|             "from_source": message_data.from_source,
 | |
|             "message_id": message_id,
 | |
|         }
 | |
| 
 | |
|         message_tokens = message_data.message_tokens
 | |
| 
 | |
|         message_trace_info = MessageTraceInfo(
 | |
|             message_id=message_id,
 | |
|             message_data=message_data.to_dict(),
 | |
|             conversation_model=conversation_mode,
 | |
|             message_tokens=message_tokens,
 | |
|             answer_tokens=message_data.answer_tokens,
 | |
|             total_tokens=message_tokens + message_data.answer_tokens,
 | |
|             error=message_data.error or "",
 | |
|             inputs=inputs,
 | |
|             outputs=message_data.answer,
 | |
|             file_list=file_list,
 | |
|             start_time=created_at,
 | |
|             end_time=created_at + timedelta(seconds=message_data.provider_response_latency),
 | |
|             metadata=metadata,
 | |
|             message_file_data=message_file_data,
 | |
|             conversation_mode=conversation_mode,
 | |
|         )
 | |
| 
 | |
|         return message_trace_info
 | |
| 
 | |
|     def moderation_trace(self, message_id, timer, **kwargs):
 | |
|         moderation_result = kwargs.get("moderation_result")
 | |
|         if not moderation_result:
 | |
|             return {}
 | |
|         inputs = kwargs.get("inputs")
 | |
|         message_data = get_message_data(message_id)
 | |
|         if not message_data:
 | |
|             return {}
 | |
|         metadata = {
 | |
|             "message_id": message_id,
 | |
|             "action": moderation_result.action,
 | |
|             "preset_response": moderation_result.preset_response,
 | |
|             "query": moderation_result.query,
 | |
|         }
 | |
| 
 | |
|         # get workflow_app_log_id
 | |
|         workflow_app_log_id = None
 | |
|         if message_data.workflow_run_id:
 | |
|             workflow_app_log_data = (
 | |
|                 db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
 | |
|             )
 | |
|             workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
 | |
| 
 | |
|         moderation_trace_info = ModerationTraceInfo(
 | |
|             message_id=workflow_app_log_id or message_id,
 | |
|             inputs=inputs,
 | |
|             message_data=message_data.to_dict(),
 | |
|             flagged=moderation_result.flagged,
 | |
|             action=moderation_result.action,
 | |
|             preset_response=moderation_result.preset_response,
 | |
|             query=moderation_result.query,
 | |
|             start_time=timer.get("start"),
 | |
|             end_time=timer.get("end"),
 | |
|             metadata=metadata,
 | |
|         )
 | |
| 
 | |
|         return moderation_trace_info
 | |
| 
 | |
|     def suggested_question_trace(self, message_id, timer, **kwargs):
 | |
|         suggested_question = kwargs.get("suggested_question", [])
 | |
|         message_data = get_message_data(message_id)
 | |
|         if not message_data:
 | |
|             return {}
 | |
|         metadata = {
 | |
|             "message_id": message_id,
 | |
|             "ls_provider": message_data.model_provider,
 | |
|             "ls_model_name": message_data.model_id,
 | |
|             "status": message_data.status,
 | |
|             "from_end_user_id": message_data.from_end_user_id,
 | |
|             "from_account_id": message_data.from_account_id,
 | |
|             "agent_based": message_data.agent_based,
 | |
|             "workflow_run_id": message_data.workflow_run_id,
 | |
|             "from_source": message_data.from_source,
 | |
|         }
 | |
| 
 | |
|         # get workflow_app_log_id
 | |
|         workflow_app_log_id = None
 | |
|         if message_data.workflow_run_id:
 | |
|             workflow_app_log_data = (
 | |
|                 db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first()
 | |
|             )
 | |
|             workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
 | |
| 
 | |
|         suggested_question_trace_info = SuggestedQuestionTraceInfo(
 | |
|             message_id=workflow_app_log_id or message_id,
 | |
|             message_data=message_data.to_dict(),
 | |
|             inputs=message_data.message,
 | |
|             outputs=message_data.answer,
 | |
|             start_time=timer.get("start"),
 | |
|             end_time=timer.get("end"),
 | |
|             metadata=metadata,
 | |
|             total_tokens=message_data.message_tokens + message_data.answer_tokens,
 | |
|             status=message_data.status,
 | |
|             error=message_data.error,
 | |
|             from_account_id=message_data.from_account_id,
 | |
|             agent_based=message_data.agent_based,
 | |
|             from_source=message_data.from_source,
 | |
|             model_provider=message_data.model_provider,
 | |
|             model_id=message_data.model_id,
 | |
|             suggested_question=suggested_question,
 | |
|             level=message_data.status,
 | |
|             status_message=message_data.error,
 | |
|         )
 | |
| 
 | |
|         return suggested_question_trace_info
 | |
| 
 | |
|     def dataset_retrieval_trace(self, message_id, timer, **kwargs):
 | |
|         documents = kwargs.get("documents")
 | |
|         message_data = get_message_data(message_id)
 | |
|         if not message_data:
 | |
|             return {}
 | |
| 
 | |
|         metadata = {
 | |
|             "message_id": message_id,
 | |
|             "ls_provider": message_data.model_provider,
 | |
|             "ls_model_name": message_data.model_id,
 | |
|             "status": message_data.status,
 | |
|             "from_end_user_id": message_data.from_end_user_id,
 | |
|             "from_account_id": message_data.from_account_id,
 | |
|             "agent_based": message_data.agent_based,
 | |
|             "workflow_run_id": message_data.workflow_run_id,
 | |
|             "from_source": message_data.from_source,
 | |
|         }
 | |
| 
 | |
|         dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
 | |
|             message_id=message_id,
 | |
|             inputs=message_data.query or message_data.inputs,
 | |
|             documents=[doc.model_dump() for doc in documents] if documents else [],
 | |
|             start_time=timer.get("start"),
 | |
|             end_time=timer.get("end"),
 | |
|             metadata=metadata,
 | |
|             message_data=message_data.to_dict(),
 | |
|         )
 | |
| 
 | |
|         return dataset_retrieval_trace_info
 | |
| 
 | |
|     def tool_trace(self, message_id, timer, **kwargs):
 | |
|         tool_name = kwargs.get("tool_name", "")
 | |
|         tool_inputs = kwargs.get("tool_inputs", {})
 | |
|         tool_outputs = kwargs.get("tool_outputs", {})
 | |
|         message_data = get_message_data(message_id)
 | |
|         if not message_data:
 | |
|             return {}
 | |
|         tool_config = {}
 | |
|         time_cost = 0
 | |
|         error = None
 | |
|         tool_parameters = {}
 | |
|         created_time = message_data.created_at
 | |
|         end_time = message_data.updated_at
 | |
|         agent_thoughts = message_data.agent_thoughts
 | |
|         for agent_thought in agent_thoughts:
 | |
|             if tool_name in agent_thought.tools:
 | |
|                 created_time = agent_thought.created_at
 | |
|                 tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
 | |
|                 tool_config = tool_meta_data.get("tool_config", {})
 | |
|                 time_cost = tool_meta_data.get("time_cost", 0)
 | |
|                 end_time = created_time + timedelta(seconds=time_cost)
 | |
|                 error = tool_meta_data.get("error", "")
 | |
|                 tool_parameters = tool_meta_data.get("tool_parameters", {})
 | |
|         metadata = {
 | |
|             "message_id": message_id,
 | |
|             "tool_name": tool_name,
 | |
|             "tool_inputs": tool_inputs,
 | |
|             "tool_outputs": tool_outputs,
 | |
|             "tool_config": tool_config,
 | |
|             "time_cost": time_cost,
 | |
|             "error": error,
 | |
|             "tool_parameters": tool_parameters,
 | |
|         }
 | |
| 
 | |
|         file_url = ""
 | |
|         message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
 | |
|         if message_file_data:
 | |
|             message_file_id = message_file_data.id if message_file_data else None
 | |
|             type = message_file_data.type
 | |
|             created_by_role = message_file_data.created_by_role
 | |
|             created_user_id = message_file_data.created_by
 | |
|             file_url = f"{self.file_base_url}/{message_file_data.url}"
 | |
| 
 | |
|             metadata.update(
 | |
|                 {
 | |
|                     "message_file_id": message_file_id,
 | |
|                     "created_by_role": created_by_role,
 | |
|                     "created_user_id": created_user_id,
 | |
|                     "type": type,
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         tool_trace_info = ToolTraceInfo(
 | |
|             message_id=message_id,
 | |
|             message_data=message_data.to_dict(),
 | |
|             tool_name=tool_name,
 | |
|             start_time=timer.get("start") if timer else created_time,
 | |
|             end_time=timer.get("end") if timer else end_time,
 | |
|             tool_inputs=tool_inputs,
 | |
|             tool_outputs=tool_outputs,
 | |
|             metadata=metadata,
 | |
|             message_file_data=message_file_data,
 | |
|             error=error,
 | |
|             inputs=message_data.message,
 | |
|             outputs=message_data.answer,
 | |
|             tool_config=tool_config,
 | |
|             time_cost=time_cost,
 | |
|             tool_parameters=tool_parameters,
 | |
|             file_url=file_url,
 | |
|         )
 | |
| 
 | |
|         return tool_trace_info
 | |
| 
 | |
|     def generate_name_trace(self, conversation_id, timer, **kwargs):
 | |
|         generate_conversation_name = kwargs.get("generate_conversation_name")
 | |
|         inputs = kwargs.get("inputs")
 | |
|         tenant_id = kwargs.get("tenant_id")
 | |
|         if not tenant_id:
 | |
|             return {}
 | |
|         start_time = timer.get("start")
 | |
|         end_time = timer.get("end")
 | |
| 
 | |
|         metadata = {
 | |
|             "conversation_id": conversation_id,
 | |
|             "tenant_id": tenant_id,
 | |
|         }
 | |
| 
 | |
|         generate_name_trace_info = GenerateNameTraceInfo(
 | |
|             conversation_id=conversation_id,
 | |
|             inputs=inputs,
 | |
|             outputs=generate_conversation_name,
 | |
|             start_time=start_time,
 | |
|             end_time=end_time,
 | |
|             metadata=metadata,
 | |
|             tenant_id=tenant_id,
 | |
|         )
 | |
| 
 | |
|         return generate_name_trace_info
 | |
| 
 | |
| 
 | |
| trace_manager_timer: Optional[threading.Timer] = None
 | |
| trace_manager_queue: queue.Queue = queue.Queue()
 | |
| trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))
 | |
| trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
 | |
| 
 | |
| 
 | |
| class TraceQueueManager:
 | |
|     def __init__(self, app_id=None, user_id=None):
 | |
|         global trace_manager_timer
 | |
| 
 | |
|         self.app_id = app_id
 | |
|         self.user_id = user_id
 | |
|         self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
 | |
|         self.flask_app = current_app._get_current_object()  # type: ignore
 | |
|         if trace_manager_timer is None:
 | |
|             self.start_timer()
 | |
| 
 | |
|     def add_trace_task(self, trace_task: TraceTask):
 | |
|         global trace_manager_timer, trace_manager_queue
 | |
|         try:
 | |
|             if self.trace_instance:
 | |
|                 trace_task.app_id = self.app_id
 | |
|                 trace_manager_queue.put(trace_task)
 | |
|         except Exception as e:
 | |
|             logging.exception(f"Error adding trace task, trace_type {trace_task.trace_type}")
 | |
|         finally:
 | |
|             self.start_timer()
 | |
| 
 | |
|     def collect_tasks(self):
 | |
|         global trace_manager_queue
 | |
|         tasks: list[TraceTask] = []
 | |
|         while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
 | |
|             task = trace_manager_queue.get_nowait()
 | |
|             tasks.append(task)
 | |
|             trace_manager_queue.task_done()
 | |
|         return tasks
 | |
| 
 | |
|     def run(self):
 | |
|         try:
 | |
|             tasks = self.collect_tasks()
 | |
|             if tasks:
 | |
|                 self.send_to_celery(tasks)
 | |
|         except Exception as e:
 | |
|             logging.exception("Error processing trace tasks")
 | |
| 
 | |
|     def start_timer(self):
 | |
|         global trace_manager_timer
 | |
|         if trace_manager_timer is None or not trace_manager_timer.is_alive():
 | |
|             trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
 | |
|             trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
 | |
|             trace_manager_timer.daemon = False
 | |
|             trace_manager_timer.start()
 | |
| 
 | |
|     def send_to_celery(self, tasks: list[TraceTask]):
 | |
|         with self.flask_app.app_context():
 | |
|             for task in tasks:
 | |
|                 if task.app_id is None:
 | |
|                     continue
 | |
|                 file_id = uuid4().hex
 | |
|                 trace_info = task.execute()
 | |
|                 task_data = TaskData(
 | |
|                     app_id=task.app_id,
 | |
|                     trace_info_type=type(trace_info).__name__,
 | |
|                     trace_info=trace_info.model_dump() if trace_info else None,
 | |
|                 )
 | |
|                 file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
 | |
|                 storage.save(file_path, task_data.model_dump_json().encode("utf-8"))
 | |
|                 file_info = {
 | |
|                     "file_id": file_id,
 | |
|                     "app_id": task.app_id,
 | |
|                 }
 | |
|                 process_trace_tasks.delay(file_info)
 |