mirror of
https://github.com/langgenius/dify.git
synced 2025-11-26 01:43:25 +00:00
191 lines
5.9 KiB
Python
191 lines
5.9 KiB
Python
import json
|
|
from collections.abc import Mapping
|
|
from typing import Any
|
|
|
|
from opentelemetry.trace import Link, Status, StatusCode
|
|
|
|
from core.ops.aliyun_trace.entities.semconv import (
|
|
GEN_AI_FRAMEWORK,
|
|
GEN_AI_SESSION_ID,
|
|
GEN_AI_SPAN_KIND,
|
|
GEN_AI_USER_ID,
|
|
INPUT_VALUE,
|
|
OUTPUT_VALUE,
|
|
GenAISpanKind,
|
|
)
|
|
from core.rag.models.document import Document
|
|
from core.workflow.entities import WorkflowNodeExecution
|
|
from core.workflow.enums import WorkflowNodeExecutionStatus
|
|
from extensions.ext_database import db
|
|
from models import EndUser
|
|
|
|
# Constants
|
|
DEFAULT_JSON_ENSURE_ASCII = False
|
|
DEFAULT_FRAMEWORK_NAME = "dify"
|
|
|
|
|
|
def get_user_id_from_message_data(message_data) -> str:
|
|
user_id = message_data.from_account_id
|
|
if message_data.from_end_user_id:
|
|
end_user_data: EndUser | None = (
|
|
db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first()
|
|
)
|
|
if end_user_data is not None:
|
|
user_id = end_user_data.session_id
|
|
return user_id
|
|
|
|
|
|
def create_status_from_error(error: str | None) -> Status:
|
|
if error:
|
|
return Status(StatusCode.ERROR, error)
|
|
return Status(StatusCode.OK)
|
|
|
|
|
|
def get_workflow_node_status(node_execution: WorkflowNodeExecution) -> Status:
|
|
if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
|
return Status(StatusCode.OK)
|
|
if node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
|
|
return Status(StatusCode.ERROR, str(node_execution.error))
|
|
return Status(StatusCode.UNSET)
|
|
|
|
|
|
def create_links_from_trace_id(trace_id: str | None) -> list[Link]:
|
|
from core.ops.aliyun_trace.data_exporter.traceclient import create_link
|
|
|
|
links = []
|
|
if trace_id:
|
|
links.append(create_link(trace_id_str=trace_id))
|
|
return links
|
|
|
|
|
|
def extract_retrieval_documents(documents: list[Document]) -> list[dict[str, Any]]:
|
|
documents_data = []
|
|
for document in documents:
|
|
document_data = {
|
|
"content": document.page_content,
|
|
"metadata": {
|
|
"dataset_id": document.metadata.get("dataset_id"),
|
|
"doc_id": document.metadata.get("doc_id"),
|
|
"document_id": document.metadata.get("document_id"),
|
|
},
|
|
"score": document.metadata.get("score"),
|
|
}
|
|
documents_data.append(document_data)
|
|
return documents_data
|
|
|
|
|
|
def serialize_json_data(data: Any, ensure_ascii: bool = DEFAULT_JSON_ENSURE_ASCII) -> str:
|
|
return json.dumps(data, ensure_ascii=ensure_ascii)
|
|
|
|
|
|
def create_common_span_attributes(
|
|
session_id: str = "",
|
|
user_id: str = "",
|
|
span_kind: str = GenAISpanKind.CHAIN,
|
|
framework: str = DEFAULT_FRAMEWORK_NAME,
|
|
inputs: str = "",
|
|
outputs: str = "",
|
|
) -> dict[str, Any]:
|
|
return {
|
|
GEN_AI_SESSION_ID: session_id,
|
|
GEN_AI_USER_ID: user_id,
|
|
GEN_AI_SPAN_KIND: span_kind,
|
|
GEN_AI_FRAMEWORK: framework,
|
|
INPUT_VALUE: inputs,
|
|
OUTPUT_VALUE: outputs,
|
|
}
|
|
|
|
|
|
def format_retrieval_documents(retrieval_documents: list) -> list:
|
|
try:
|
|
if not isinstance(retrieval_documents, list):
|
|
return []
|
|
|
|
semantic_documents = []
|
|
for doc in retrieval_documents:
|
|
if not isinstance(doc, dict):
|
|
continue
|
|
|
|
metadata = doc.get("metadata", {})
|
|
content = doc.get("content", "")
|
|
title = doc.get("title", "")
|
|
score = metadata.get("score", 0.0)
|
|
document_id = metadata.get("document_id", "")
|
|
|
|
semantic_metadata = {}
|
|
if title:
|
|
semantic_metadata["title"] = title
|
|
if metadata.get("source"):
|
|
semantic_metadata["source"] = metadata["source"]
|
|
elif metadata.get("_source"):
|
|
semantic_metadata["source"] = metadata["_source"]
|
|
if metadata.get("doc_metadata"):
|
|
doc_metadata = metadata["doc_metadata"]
|
|
if isinstance(doc_metadata, dict):
|
|
semantic_metadata.update(doc_metadata)
|
|
|
|
semantic_doc = {
|
|
"document": {"content": content, "metadata": semantic_metadata, "score": score, "id": document_id}
|
|
}
|
|
semantic_documents.append(semantic_doc)
|
|
|
|
return semantic_documents
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def format_input_messages(process_data: Mapping[str, Any]) -> str:
|
|
try:
|
|
if not isinstance(process_data, dict):
|
|
return serialize_json_data([])
|
|
|
|
prompts = process_data.get("prompts", [])
|
|
if not prompts:
|
|
return serialize_json_data([])
|
|
|
|
valid_roles = {"system", "user", "assistant", "tool"}
|
|
input_messages = []
|
|
for prompt in prompts:
|
|
if not isinstance(prompt, dict):
|
|
continue
|
|
|
|
role = prompt.get("role", "")
|
|
text = prompt.get("text", "")
|
|
|
|
if not role or role not in valid_roles:
|
|
continue
|
|
|
|
if text:
|
|
message = {"role": role, "parts": [{"type": "text", "content": text}]}
|
|
input_messages.append(message)
|
|
|
|
return serialize_json_data(input_messages)
|
|
except Exception:
|
|
return serialize_json_data([])
|
|
|
|
|
|
def format_output_messages(outputs: Mapping[str, Any]) -> str:
|
|
try:
|
|
if not isinstance(outputs, dict):
|
|
return serialize_json_data([])
|
|
|
|
text = outputs.get("text", "")
|
|
finish_reason = outputs.get("finish_reason", "")
|
|
|
|
if not text:
|
|
return serialize_json_data([])
|
|
|
|
valid_finish_reasons = {"stop", "length", "content_filter", "tool_call", "error"}
|
|
if finish_reason not in valid_finish_reasons:
|
|
finish_reason = "stop"
|
|
|
|
output_message = {
|
|
"role": "assistant",
|
|
"parts": [{"type": "text", "content": text}],
|
|
"finish_reason": finish_reason,
|
|
}
|
|
|
|
return serialize_json_data([output_message])
|
|
except Exception:
|
|
return serialize_json_data([])
|