mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-11-23 21:45:39 +00:00
### What problem does this PR solve? Add `canvas_category` field for UserCanvas and CanvasTemplate. ### Type of change - [x] New Feature (non-breaking change which adds functionality)
475 lines
18 KiB
Python
475 lines
18 KiB
Python
#
|
|
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
from functools import partial
|
|
|
|
import trio
|
|
from flask import request, Response
|
|
from flask_login import login_required, current_user
|
|
|
|
from agent.component import LLM
|
|
from api.db import CanvasCategory, FileType
|
|
from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService
|
|
from api.db.services.document_service import DocumentService
|
|
from api.db.services.file_service import FileService
|
|
from api.db.services.user_service import TenantService
|
|
from api.db.services.user_canvas_version import UserCanvasVersionService
|
|
from api.settings import RetCode
|
|
from api.utils import get_uuid
|
|
from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result
|
|
from agent.canvas import Canvas
|
|
from peewee import MySQLDatabase, PostgresqlDatabase
|
|
from api.db.db_models import APIToken
|
|
import time
|
|
|
|
from api.utils.file_utils import filename_type, read_potential_broken_pdf
|
|
from rag.utils.redis_conn import REDIS_CONN
|
|
|
|
|
|
@manager.route('/templates', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def templates():
|
|
return get_json_result(data=[c.to_dict() for c in CanvasTemplateService.query(canvas_category=CanvasCategory.Agent)])
|
|
|
|
|
|
@manager.route('/list', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def canvas_list():
|
|
return get_json_result(data=sorted([c.to_dict() for c in \
|
|
UserCanvasService.query(user_id=current_user.id, canvas_category=CanvasCategory.Agent)], key=lambda x: x["update_time"]*-1)
|
|
)
|
|
|
|
|
|
@manager.route('/rm', methods=['POST']) # noqa: F821
|
|
@validate_request("canvas_ids")
|
|
@login_required
|
|
def rm():
|
|
for i in request.json["canvas_ids"]:
|
|
if not UserCanvasService.accessible(i, current_user.id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
UserCanvasService.delete_by_id(i)
|
|
return get_json_result(data=True)
|
|
|
|
|
|
@manager.route('/set', methods=['POST']) # noqa: F821
|
|
@validate_request("dsl", "title")
|
|
@login_required
|
|
def save():
|
|
req = request.json
|
|
if not isinstance(req["dsl"], str):
|
|
req["dsl"] = json.dumps(req["dsl"], ensure_ascii=False)
|
|
req["dsl"] = json.loads(req["dsl"])
|
|
if "id" not in req:
|
|
req["user_id"] = current_user.id
|
|
if UserCanvasService.query(user_id=current_user.id, title=req["title"].strip(), canvas_category=CanvasCategory.Agent):
|
|
return get_data_error_result(message=f"{req['title'].strip()} already exists.")
|
|
req["id"] = get_uuid()
|
|
if not UserCanvasService.save(**req):
|
|
return get_data_error_result(message="Fail to save canvas.")
|
|
else:
|
|
if not UserCanvasService.accessible(req["id"], current_user.id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
UserCanvasService.update_by_id(req["id"], req)
|
|
# save version
|
|
UserCanvasVersionService.insert(user_canvas_id=req["id"], dsl=req["dsl"], title="{0}_{1}".format(req["title"], time.strftime("%Y_%m_%d_%H_%M_%S")))
|
|
UserCanvasVersionService.delete_all_versions(req["id"])
|
|
return get_json_result(data=req)
|
|
|
|
|
|
@manager.route('/get/<canvas_id>', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def get(canvas_id):
|
|
if not UserCanvasService.accessible(canvas_id, current_user.id):
|
|
return get_data_error_result(message="canvas not found.")
|
|
e, c = UserCanvasService.get_by_tenant_id(canvas_id)
|
|
return get_json_result(data=c)
|
|
|
|
|
|
@manager.route('/getsse/<canvas_id>', methods=['GET']) # type: ignore # noqa: F821
|
|
def getsse(canvas_id):
|
|
token = request.headers.get('Authorization').split()
|
|
if len(token) != 2:
|
|
return get_data_error_result(message='Authorization is not valid!"')
|
|
token = token[1]
|
|
objs = APIToken.query(beta=token)
|
|
if not objs:
|
|
return get_data_error_result(message='Authentication error: API key is invalid!"')
|
|
tenant_id = objs[0].tenant_id
|
|
if not UserCanvasService.query(user_id=tenant_id, id=canvas_id):
|
|
return get_json_result(
|
|
data=False,
|
|
message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR
|
|
)
|
|
e, c = UserCanvasService.get_by_id(canvas_id)
|
|
if not e or c.user_id != tenant_id:
|
|
return get_data_error_result(message="canvas not found.")
|
|
return get_json_result(data=c.to_dict())
|
|
|
|
|
|
@manager.route('/completion', methods=['POST']) # noqa: F821
|
|
@validate_request("id")
|
|
@login_required
|
|
def run():
|
|
req = request.json
|
|
query = req.get("query", "")
|
|
files = req.get("files", [])
|
|
inputs = req.get("inputs", {})
|
|
user_id = req.get("user_id", current_user.id)
|
|
if not UserCanvasService.accessible(req["id"], current_user.id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
|
|
e, cvs = UserCanvasService.get_by_id(req["id"])
|
|
if not e:
|
|
return get_data_error_result(message="canvas not found.")
|
|
|
|
if not isinstance(cvs.dsl, str):
|
|
cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
|
|
|
|
try:
|
|
canvas = Canvas(cvs.dsl, current_user.id, req["id"])
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
def sse():
|
|
nonlocal canvas, user_id
|
|
try:
|
|
for ans in canvas.run(query=query, files=files, user_id=user_id, inputs=inputs):
|
|
yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n"
|
|
|
|
cvs.dsl = json.loads(str(canvas))
|
|
UserCanvasService.update_by_id(req["id"], cvs.to_dict())
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n"
|
|
|
|
resp = Response(sse(), mimetype="text/event-stream")
|
|
resp.headers.add_header("Cache-control", "no-cache")
|
|
resp.headers.add_header("Connection", "keep-alive")
|
|
resp.headers.add_header("X-Accel-Buffering", "no")
|
|
resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
|
|
return resp
|
|
|
|
|
|
@manager.route('/reset', methods=['POST']) # noqa: F821
|
|
@validate_request("id")
|
|
@login_required
|
|
def reset():
|
|
req = request.json
|
|
if not UserCanvasService.accessible(req["id"], current_user.id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
try:
|
|
e, user_canvas = UserCanvasService.get_by_id(req["id"])
|
|
if not e:
|
|
return get_data_error_result(message="canvas not found.")
|
|
|
|
canvas = Canvas(json.dumps(user_canvas.dsl), current_user.id)
|
|
canvas.reset()
|
|
req["dsl"] = json.loads(str(canvas))
|
|
UserCanvasService.update_by_id(req["id"], {"dsl": req["dsl"]})
|
|
return get_json_result(data=req["dsl"])
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route("/upload/<canvas_id>", methods=["POST"]) # noqa: F821
|
|
def upload(canvas_id):
|
|
e, cvs = UserCanvasService.get_by_tenant_id(canvas_id)
|
|
if not e:
|
|
return get_data_error_result(message="canvas not found.")
|
|
|
|
user_id = cvs["user_id"]
|
|
def structured(filename, filetype, blob, content_type):
|
|
nonlocal user_id
|
|
if filetype == FileType.PDF.value:
|
|
blob = read_potential_broken_pdf(blob)
|
|
|
|
location = get_uuid()
|
|
FileService.put_blob(user_id, location, blob)
|
|
|
|
return {
|
|
"id": location,
|
|
"name": filename,
|
|
"size": sys.getsizeof(blob),
|
|
"extension": filename.split(".")[-1].lower(),
|
|
"mime_type": content_type,
|
|
"created_by": user_id,
|
|
"created_at": time.time(),
|
|
"preview_url": None
|
|
}
|
|
|
|
if request.args.get("url"):
|
|
from crawl4ai import (
|
|
AsyncWebCrawler,
|
|
BrowserConfig,
|
|
CrawlerRunConfig,
|
|
DefaultMarkdownGenerator,
|
|
PruningContentFilter,
|
|
CrawlResult
|
|
)
|
|
try:
|
|
url = request.args.get("url")
|
|
filename = re.sub(r"\?.*", "", url.split("/")[-1])
|
|
async def adownload():
|
|
browser_config = BrowserConfig(
|
|
headless=True,
|
|
verbose=False,
|
|
)
|
|
async with AsyncWebCrawler(config=browser_config) as crawler:
|
|
crawler_config = CrawlerRunConfig(
|
|
markdown_generator=DefaultMarkdownGenerator(
|
|
content_filter=PruningContentFilter()
|
|
),
|
|
pdf=True,
|
|
screenshot=False
|
|
)
|
|
result: CrawlResult = await crawler.arun(
|
|
url=url,
|
|
config=crawler_config
|
|
)
|
|
return result
|
|
page = trio.run(adownload())
|
|
if page.pdf:
|
|
if filename.split(".")[-1].lower() != "pdf":
|
|
filename += ".pdf"
|
|
return get_json_result(data=structured(filename, "pdf", page.pdf, page.response_headers["content-type"]))
|
|
|
|
return get_json_result(data=structured(filename, "html", str(page.markdown).encode("utf-8"), page.response_headers["content-type"], user_id))
|
|
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
file = request.files['file']
|
|
try:
|
|
DocumentService.check_doc_health(user_id, file.filename)
|
|
return get_json_result(data=structured(file.filename, filename_type(file.filename), file.read(), file.content_type))
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route('/input_form', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def input_form():
|
|
cvs_id = request.args.get("id")
|
|
cpn_id = request.args.get("component_id")
|
|
try:
|
|
e, user_canvas = UserCanvasService.get_by_id(cvs_id)
|
|
if not e:
|
|
return get_data_error_result(message="canvas not found.")
|
|
if not UserCanvasService.query(user_id=current_user.id, id=cvs_id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
|
|
canvas = Canvas(json.dumps(user_canvas.dsl), current_user.id)
|
|
return get_json_result(data=canvas.get_component_input_form(cpn_id))
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route('/debug', methods=['POST']) # noqa: F821
|
|
@validate_request("id", "component_id", "params")
|
|
@login_required
|
|
def debug():
|
|
req = request.json
|
|
if not UserCanvasService.accessible(req["id"], current_user.id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
try:
|
|
e, user_canvas = UserCanvasService.get_by_id(req["id"])
|
|
canvas = Canvas(json.dumps(user_canvas.dsl), current_user.id)
|
|
canvas.reset()
|
|
canvas.message_id = get_uuid()
|
|
component = canvas.get_component(req["component_id"])["obj"]
|
|
component.reset()
|
|
|
|
if isinstance(component, LLM):
|
|
component.set_debug_inputs(req["params"])
|
|
component.invoke(**{k: o["value"] for k,o in req["params"].items()})
|
|
outputs = component.output()
|
|
for k in outputs.keys():
|
|
if isinstance(outputs[k], partial):
|
|
txt = ""
|
|
for c in outputs[k]():
|
|
txt += c
|
|
outputs[k] = txt
|
|
return get_json_result(data=outputs)
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route('/test_db_connect', methods=['POST']) # noqa: F821
|
|
@validate_request("db_type", "database", "username", "host", "port", "password")
|
|
@login_required
|
|
def test_db_connect():
|
|
req = request.json
|
|
try:
|
|
if req["db_type"] in ["mysql", "mariadb"]:
|
|
db = MySQLDatabase(req["database"], user=req["username"], host=req["host"], port=req["port"],
|
|
password=req["password"])
|
|
elif req["db_type"] == 'postgresql':
|
|
db = PostgresqlDatabase(req["database"], user=req["username"], host=req["host"], port=req["port"],
|
|
password=req["password"])
|
|
elif req["db_type"] == 'mssql':
|
|
import pyodbc
|
|
connection_string = (
|
|
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
|
|
f"SERVER={req['host']},{req['port']};"
|
|
f"DATABASE={req['database']};"
|
|
f"UID={req['username']};"
|
|
f"PWD={req['password']};"
|
|
)
|
|
db = pyodbc.connect(connection_string)
|
|
cursor = db.cursor()
|
|
cursor.execute("SELECT 1")
|
|
cursor.close()
|
|
else:
|
|
return server_error_response("Unsupported database type.")
|
|
if req["db_type"] != 'mssql':
|
|
db.connect()
|
|
db.close()
|
|
|
|
return get_json_result(data="Database Connection Successful!")
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
#api get list version dsl of canvas
|
|
@manager.route('/getlistversion/<canvas_id>', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def getlistversion(canvas_id):
|
|
try:
|
|
list =sorted([c.to_dict() for c in UserCanvasVersionService.list_by_canvas_id(canvas_id)], key=lambda x: x["update_time"]*-1)
|
|
return get_json_result(data=list)
|
|
except Exception as e:
|
|
return get_data_error_result(message=f"Error getting history files: {e}")
|
|
|
|
|
|
#api get version dsl of canvas
|
|
@manager.route('/getversion/<version_id>', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def getversion( version_id):
|
|
try:
|
|
|
|
e, version = UserCanvasVersionService.get_by_id(version_id)
|
|
if version:
|
|
return get_json_result(data=version.to_dict())
|
|
except Exception as e:
|
|
return get_json_result(data=f"Error getting history file: {e}")
|
|
|
|
|
|
@manager.route('/listteam', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def list_canvas():
|
|
keywords = request.args.get("keywords", "")
|
|
page_number = int(request.args.get("page", 1))
|
|
items_per_page = int(request.args.get("page_size", 150))
|
|
orderby = request.args.get("orderby", "create_time")
|
|
desc = request.args.get("desc", True)
|
|
try:
|
|
tenants = TenantService.get_joined_tenants_by_user_id(current_user.id)
|
|
canvas, total = UserCanvasService.get_by_tenant_ids(
|
|
[m["tenant_id"] for m in tenants], current_user.id, page_number,
|
|
items_per_page, orderby, desc, keywords, canvas_category=CanvasCategory.Agent)
|
|
return get_json_result(data={"canvas": canvas, "total": total})
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|
|
|
|
@manager.route('/setting', methods=['POST']) # noqa: F821
|
|
@validate_request("id", "title", "permission")
|
|
@login_required
|
|
def setting():
|
|
req = request.json
|
|
req["user_id"] = current_user.id
|
|
|
|
if not UserCanvasService.accessible(req["id"], current_user.id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
|
|
e,flow = UserCanvasService.get_by_id(req["id"])
|
|
if not e:
|
|
return get_data_error_result(message="canvas not found.")
|
|
flow = flow.to_dict()
|
|
flow["title"] = req["title"]
|
|
if req["description"]:
|
|
flow["description"] = req["description"]
|
|
if req["permission"]:
|
|
flow["permission"] = req["permission"]
|
|
if req["avatar"]:
|
|
flow["avatar"] = req["avatar"]
|
|
|
|
num= UserCanvasService.update_by_id(req["id"], flow)
|
|
return get_json_result(data=num)
|
|
|
|
|
|
@manager.route('/trace', methods=['GET']) # noqa: F821
|
|
def trace():
|
|
cvs_id = request.args.get("canvas_id")
|
|
msg_id = request.args.get("message_id")
|
|
try:
|
|
bin = REDIS_CONN.get(f"{cvs_id}-{msg_id}-logs")
|
|
if not bin:
|
|
return get_json_result(data={})
|
|
|
|
return get_json_result(data=json.loads(bin.encode("utf-8")))
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
|
|
|
|
@manager.route('/<canvas_id>/sessions', methods=['GET']) # noqa: F821
|
|
@login_required
|
|
def sessions(canvas_id):
|
|
tenant_id = current_user.id
|
|
if not UserCanvasService.accessible(canvas_id, tenant_id):
|
|
return get_json_result(
|
|
data=False, message='Only owner of canvas authorized for this operation.',
|
|
code=RetCode.OPERATING_ERROR)
|
|
|
|
user_id = request.args.get("user_id")
|
|
page_number = int(request.args.get("page", 1))
|
|
items_per_page = int(request.args.get("page_size", 30))
|
|
keywords = request.args.get("keywords")
|
|
from_date = request.args.get("from_date")
|
|
to_date = request.args.get("to_date")
|
|
orderby = request.args.get("orderby", "update_time")
|
|
if request.args.get("desc") == "False" or request.args.get("desc") == "false":
|
|
desc = False
|
|
else:
|
|
desc = True
|
|
# dsl defaults to True in all cases except for False and false
|
|
include_dsl = request.args.get("dsl") != "False" and request.args.get("dsl") != "false"
|
|
total, sess = API4ConversationService.get_list(canvas_id, tenant_id, page_number, items_per_page, orderby, desc,
|
|
None, user_id, include_dsl, keywords, from_date, to_date)
|
|
try:
|
|
return get_json_result(data={"total": total, "sessions": sess})
|
|
except Exception as e:
|
|
return server_error_response(e)
|
|
|