Test: Add web API test suite for knowledge base operations (#8254)

### What problem does this PR solve?

- Implement RAGFlowWebApiAuth class for web API authentication
- Add comprehensive test cases for KB CRUD operations
- Set up common fixtures and utilities in conftest.py
- Add helper functions in common.py for web API requests

The changes establish a complete testing framework for knowledge base
management via web API endpoints.

### Type of change

- [x] Add test case
This commit is contained in:
Liu An 2025-06-13 16:39:10 +08:00 committed by GitHub
parent 8f9e7a6f6f
commit 64af09ce7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1030 additions and 5 deletions

View File

@ -23,3 +23,12 @@ class RAGFlowHttpApiAuth(AuthBase):
def __call__(self, r): def __call__(self, r):
r.headers["Authorization"] = f"Bearer {self._token}" r.headers["Authorization"] = f"Bearer {self._token}"
return r return r
class RAGFlowWebApiAuth(AuthBase):
def __init__(self, token):
self._token = token
def __call__(self, r):
r.headers["Authorization"] = self._token
return r

View File

@ -99,14 +99,14 @@ class TestCapability:
class TestDatasetsDelete: class TestDatasetsDelete:
@pytest.mark.p1 @pytest.mark.p1
@pytest.mark.parametrize( @pytest.mark.parametrize(
"func, expected_code, expected_message, remaining", "func, expected_code, remaining",
[ [
(lambda r: {"ids": r[:1]}, 0, "", 2), (lambda r: {"ids": r[:1]}, 0, 2),
(lambda r: {"ids": r}, 0, "", 0), (lambda r: {"ids": r}, 0, 0),
], ],
ids=["single_dataset", "multiple_datasets"], ids=["single_dataset", "multiple_datasets"],
) )
def test_ids(self, HttpApiAuth, add_datasets_func, func, expected_code, expected_message, remaining): def test_ids(self, HttpApiAuth, add_datasets_func, func, expected_code, remaining):
dataset_ids = add_datasets_func dataset_ids = add_datasets_func
if callable(func): if callable(func):
payload = func(dataset_ids) payload = func(dataset_ids)

View File

@ -116,7 +116,7 @@ def clear_session_with_chat_assistants(request, add_chat_assistants):
@pytest.fixture(scope="class") @pytest.fixture(scope="class")
def add_dataset(request: FixtureRequest, client: RAGFlow): def add_dataset(request: FixtureRequest, client: RAGFlow) -> DataSet:
def cleanup(): def cleanup():
client.delete_datasets(ids=None) client.delete_datasets(ids=None)

View File

@ -0,0 +1,93 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import requests
from configs import HOST_ADDRESS
HEADERS = {"Content-Type": "application/json"}
KB_APP_URL = "/v1/kb"
# FILE_API_URL = "/api/v1/datasets/{dataset_id}/documents"
# FILE_CHUNK_API_URL = "/api/v1/datasets/{dataset_id}/chunks"
# CHUNK_API_URL = "/api/v1/datasets/{dataset_id}/documents/{document_id}/chunks"
# CHAT_ASSISTANT_API_URL = "/api/v1/chats"
# SESSION_WITH_CHAT_ASSISTANT_API_URL = "/api/v1/chats/{chat_id}/sessions"
# SESSION_WITH_AGENT_API_URL = "/api/v1/agents/{agent_id}/sessions"
# DATASET MANAGEMENT
def create_kb(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{KB_APP_URL}/create", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def list_kbs(auth, params=None, payload=None, *, headers=HEADERS, data=None):
if payload is None:
payload = {}
res = requests.post(url=f"{HOST_ADDRESS}{KB_APP_URL}/list", headers=headers, auth=auth, params=params, json=payload, data=data)
return res.json()
def update_kb(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{KB_APP_URL}/update", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def rm_kb(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{KB_APP_URL}/rm", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def detail_kb(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{KB_APP_URL}/detail", headers=headers, auth=auth, params=params)
return res.json()
def list_tags_from_kbs(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{KB_APP_URL}/tags", headers=headers, auth=auth, params=params)
return res.json()
def list_tags(auth, dataset_id, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{KB_APP_URL}/{dataset_id}/tags", headers=headers, auth=auth, params=params)
return res.json()
def rm_tags(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{KB_APP_URL}/{dataset_id}/rm_tags", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def rename_tags(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{KB_APP_URL}/{dataset_id}/rename_tags", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def knowledge_graph(auth, dataset_id, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{KB_APP_URL}/{dataset_id}/knowledge_graph", headers=headers, auth=auth, params=params)
return res.json()
def delete_knowledge_graph(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.delete(url=f"{HOST_ADDRESS}{KB_APP_URL}/{dataset_id}/delete_knowledge_graph", headers=headers, auth=auth, json=payload, data=data)
return res.json()
def batch_create_datasets(auth, num):
ids = []
for i in range(num):
res = create_kb(auth, {"name": f"kb_{i}"})
ids.append(res["data"]["kb_id"])
return ids

View File

@ -0,0 +1,100 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import (
batch_create_datasets,
)
from configs import HOST_ADDRESS, VERSION
from libs.auth import RAGFlowWebApiAuth
from pytest import FixtureRequest
from ragflow_sdk import RAGFlow
from utils.file_utils import (
create_docx_file,
create_eml_file,
create_excel_file,
create_html_file,
create_image_file,
create_json_file,
create_md_file,
create_pdf_file,
create_ppt_file,
create_txt_file,
)
@pytest.fixture
def generate_test_files(request: FixtureRequest, tmp_path):
file_creators = {
"docx": (tmp_path / "ragflow_test.docx", create_docx_file),
"excel": (tmp_path / "ragflow_test.xlsx", create_excel_file),
"ppt": (tmp_path / "ragflow_test.pptx", create_ppt_file),
"image": (tmp_path / "ragflow_test.png", create_image_file),
"pdf": (tmp_path / "ragflow_test.pdf", create_pdf_file),
"txt": (tmp_path / "ragflow_test.txt", create_txt_file),
"md": (tmp_path / "ragflow_test.md", create_md_file),
"json": (tmp_path / "ragflow_test.json", create_json_file),
"eml": (tmp_path / "ragflow_test.eml", create_eml_file),
"html": (tmp_path / "ragflow_test.html", create_html_file),
}
files = {}
for file_type, (file_path, creator_func) in file_creators.items():
if request.param in ["", file_type]:
creator_func(file_path)
files[file_type] = file_path
return files
@pytest.fixture(scope="class")
def ragflow_tmp_dir(request, tmp_path_factory):
class_name = request.cls.__name__
return tmp_path_factory.mktemp(class_name)
@pytest.fixture(scope="session")
def WebApiAuth(auth):
return RAGFlowWebApiAuth(auth)
@pytest.fixture(scope="session")
def client(token: str) -> RAGFlow:
return RAGFlow(api_key=token, base_url=HOST_ADDRESS, version=VERSION)
@pytest.fixture(scope="function")
def clear_datasets(request: FixtureRequest, client: RAGFlow):
def cleanup():
client.delete_datasets(ids=None)
request.addfinalizer(cleanup)
@pytest.fixture(scope="class")
def add_dataset(request: FixtureRequest, client: RAGFlow, WebApiAuth: RAGFlowWebApiAuth) -> str:
def cleanup():
client.delete_datasets(ids=None)
request.addfinalizer(cleanup)
return batch_create_datasets(WebApiAuth, 1)[0]
@pytest.fixture(scope="function")
def add_dataset_func(request: FixtureRequest, client: RAGFlow, WebApiAuth: RAGFlowWebApiAuth) -> str:
def cleanup():
client.delete_datasets(ids=None)
request.addfinalizer(cleanup)
return batch_create_datasets(WebApiAuth, 1)[0]

View File

@ -0,0 +1,38 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import batch_create_datasets
from libs.auth import RAGFlowWebApiAuth
from pytest import FixtureRequest
from ragflow_sdk import RAGFlow
@pytest.fixture(scope="class")
def add_datasets(request: FixtureRequest, client: RAGFlow, WebApiAuth: RAGFlowWebApiAuth) -> list[str]:
def cleanup():
client.delete_datasets(ids=None)
request.addfinalizer(cleanup)
return batch_create_datasets(WebApiAuth, 5)
@pytest.fixture(scope="function")
def add_datasets_func(request: FixtureRequest, client: RAGFlow, WebApiAuth: RAGFlowWebApiAuth) -> list[str]:
def cleanup():
client.delete_datasets(ids=None)
request.addfinalizer(cleanup)
return batch_create_datasets(WebApiAuth, 3)

View File

@ -0,0 +1,109 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from common import create_kb
from configs import DATASET_NAME_LIMIT, INVALID_API_TOKEN
from hypothesis import example, given, settings
from libs.auth import RAGFlowWebApiAuth
from utils.hypothesis_utils import valid_names
@pytest.mark.usefixtures("clear_datasets")
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
ids=["empty_auth", "invalid_api_token"],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = create_kb(invalid_auth, {"name": "auth_test"})
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
@pytest.mark.usefixtures("clear_datasets")
class TestCapability:
@pytest.mark.p3
def test_create_kb_1k(self, WebApiAuth):
for i in range(1_000):
payload = {"name": f"dataset_{i}"}
res = create_kb(WebApiAuth, payload)
assert res["code"] == 0, f"Failed to create dataset {i}"
@pytest.mark.p3
def test_create_kb_concurrent(self, WebApiAuth):
count = 100
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(create_kb, WebApiAuth, {"name": f"dataset_{i}"}) for i in range(count)]
responses = list(as_completed(futures))
assert len(responses) == count, responses
assert all(future.result()["code"] == 0 for future in futures)
@pytest.mark.usefixtures("clear_datasets")
class TestDatasetCreate:
@pytest.mark.p1
@given(name=valid_names())
@example("a" * 128)
@settings(max_examples=20)
def test_name(self, WebApiAuth, name):
res = create_kb(WebApiAuth, {"name": name})
assert res["code"] == 0, res
@pytest.mark.p2
@pytest.mark.parametrize(
"name, expected_message",
[
("", "Dataset name can't be empty."),
(" ", "Dataset name can't be empty."),
("a" * (DATASET_NAME_LIMIT + 1), "Dataset name length is 129 which is large than 128"),
(0, "Dataset name must be string."),
(None, "Dataset name must be string."),
],
ids=["empty_name", "space_name", "too_long_name", "invalid_name", "None_name"],
)
def test_name_invalid(self, WebApiAuth, name, expected_message):
payload = {"name": name}
res = create_kb(WebApiAuth, payload)
assert res["code"] == 102, res
assert expected_message in res["message"], res
@pytest.mark.p3
def test_name_duplicated(self, WebApiAuth):
name = "duplicated_name"
payload = {"name": name}
res = create_kb(WebApiAuth, payload)
assert res["code"] == 0, res
res = create_kb(WebApiAuth, payload)
assert res["code"] == 0, res
@pytest.mark.p3
def test_name_case_insensitive(self, WebApiAuth):
name = "CaseInsensitive"
payload = {"name": name.upper()}
res = create_kb(WebApiAuth, payload)
assert res["code"] == 0, res
payload = {"name": name.lower()}
res = create_kb(WebApiAuth, payload)
assert res["code"] == 0, res

View File

@ -0,0 +1,53 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import (
detail_kb,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = detail_kb(invalid_auth)
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestDatasetsDetail:
@pytest.mark.p1
def test_kb_id(self, WebApiAuth, add_dataset):
kb_id = add_dataset
payload = {"kb_id": kb_id}
res = detail_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["name"] == "kb_0"
@pytest.mark.p2
def test_id_wrong_uuid(self, WebApiAuth):
payload = {"kb_id": "d94a8dc02c9711f0930f7fbc369eab6d"}
res = detail_kb(WebApiAuth, payload)
assert res["code"] == 103, res
assert "Only owner of knowledgebase authorized for this operation." in res["message"], res

View File

@ -0,0 +1,184 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from common import list_kbs
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
from utils import is_sorted
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = list_kbs(invalid_auth)
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestCapability:
@pytest.mark.p3
def test_concurrent_list(self, WebApiAuth):
count = 100
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(list_kbs, WebApiAuth) for i in range(count)]
responses = list(as_completed(futures))
assert len(responses) == count, responses
assert all(future.result()["code"] == 0 for future in futures)
@pytest.mark.usefixtures("add_datasets")
class TestDatasetsList:
@pytest.mark.p1
def test_params_unset(self, WebApiAuth):
res = list_kbs(WebApiAuth, None)
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == 5, res
@pytest.mark.p2
def test_params_empty(self, WebApiAuth):
res = list_kbs(WebApiAuth, {})
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == 5, res
@pytest.mark.p1
@pytest.mark.parametrize(
"params, expected_page_size",
[
({"page": 2, "page_size": 2}, 2),
({"page": 3, "page_size": 2}, 1),
({"page": 4, "page_size": 2}, 0),
({"page": "2", "page_size": 2}, 2),
({"page": 1, "page_size": 10}, 5),
],
ids=["normal_middle_page", "normal_last_partial_page", "beyond_max_page", "string_page_number", "full_data_single_page"],
)
def test_page(self, WebApiAuth, params, expected_page_size):
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == expected_page_size, res
@pytest.mark.skip
@pytest.mark.p2
@pytest.mark.parametrize(
"params, expected_code, expected_message",
[
({"page": 0}, 101, "Input should be greater than or equal to 1"),
({"page": "a"}, 101, "Input should be a valid integer, unable to parse string as an integer"),
],
ids=["page_0", "page_a"],
)
def test_page_invalid(self, WebApiAuth, params, expected_code, expected_message):
res = list_kbs(WebApiAuth, params=params)
assert res["code"] == expected_code, res
assert expected_message in res["message"], res
@pytest.mark.p2
def test_page_none(self, WebApiAuth):
params = {"page": None}
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == 5, res
@pytest.mark.p1
@pytest.mark.parametrize(
"params, expected_page_size",
[
({"page": 1, "page_size": 1}, 1),
({"page": 1, "page_size": 3}, 3),
({"page": 1, "page_size": 5}, 5),
({"page": 1, "page_size": 6}, 5),
({"page": 1, "page_size": "1"}, 1),
],
ids=["min_valid_page_size", "medium_page_size", "page_size_equals_total", "page_size_exceeds_total", "string_type_page_size"],
)
def test_page_size(self, WebApiAuth, params, expected_page_size):
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == expected_page_size, res
@pytest.mark.skip
@pytest.mark.p2
@pytest.mark.parametrize(
"params, expected_code, expected_message",
[
({"page_size": 0}, 101, "Input should be greater than or equal to 1"),
({"page_size": "a"}, 101, "Input should be a valid integer, unable to parse string as an integer"),
],
)
def test_page_size_invalid(self, WebApiAuth, params, expected_code, expected_message):
res = list_kbs(WebApiAuth, params)
assert res["code"] == expected_code, res
assert expected_message in res["message"], res
@pytest.mark.p2
def test_page_size_none(self, WebApiAuth):
params = {"page_size": None}
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == 5, res
@pytest.mark.p2
@pytest.mark.parametrize(
"params, assertions",
[
({"orderby": "update_time"}, lambda r: (is_sorted(r["data"]["kbs"], "update_time", True))),
],
ids=["orderby_update_time"],
)
def test_orderby(self, WebApiAuth, params, assertions):
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
if callable(assertions):
assert assertions(res), res
@pytest.mark.p2
@pytest.mark.parametrize(
"params, assertions",
[
({"desc": "True"}, lambda r: (is_sorted(r["data"]["kbs"], "update_time", True))),
({"desc": "False"}, lambda r: (is_sorted(r["data"]["kbs"], "update_time", False))),
],
ids=["desc=True", "desc=False"],
)
def test_desc(self, WebApiAuth, params, assertions):
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
if callable(assertions):
assert assertions(res), res
@pytest.mark.p2
@pytest.mark.parametrize(
"params, expected_page_size",
[
({"parser_id": "naive"}, 5),
({"parser_id": "qa"}, 0),
],
ids=["naive", "dqa"],
)
def test_parser_id(self, WebApiAuth, params, expected_page_size):
res = list_kbs(WebApiAuth, params)
assert res["code"] == 0, res
assert len(res["data"]["kbs"]) == expected_page_size, res

View File

@ -0,0 +1,61 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import (
list_kbs,
rm_kb,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = rm_kb(invalid_auth)
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestDatasetsDelete:
@pytest.mark.p1
def test_kb_id(self, WebApiAuth, add_datasets_func):
kb_ids = add_datasets_func
payload = {"kb_id": kb_ids[0]}
res = rm_kb(WebApiAuth, payload)
assert res["code"] == 0, res
res = list_kbs(WebApiAuth)
assert len(res["data"]["kbs"]) == 2, res
@pytest.mark.p2
@pytest.mark.usefixtures("add_dataset_func")
def test_id_wrong_uuid(self, WebApiAuth):
payload = {"kb_id": "d94a8dc02c9711f0930f7fbc369eab6d"}
res = rm_kb(WebApiAuth, payload)
assert res["code"] == 109, res
assert "No authorization." in res["message"], res
res = list_kbs(WebApiAuth)
assert len(res["data"]["kbs"]) == 1, res

View File

@ -0,0 +1,378 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from common import update_kb
from configs import DATASET_NAME_LIMIT, INVALID_API_TOKEN
from hypothesis import HealthCheck, example, given, settings
from libs.auth import RAGFlowWebApiAuth
from utils import encode_avatar
from utils.file_utils import create_image_file
from utils.hypothesis_utils import valid_names
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
ids=["empty_auth", "invalid_api_token"],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = update_kb(invalid_auth, "dataset_id")
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestCapability:
@pytest.mark.p3
def test_update_dateset_concurrent(self, WebApiAuth, add_dataset_func):
dataset_id = add_dataset_func
count = 100
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(
update_kb,
WebApiAuth,
{
"kb_id": dataset_id,
"name": f"dataset_{i}",
"description": "",
"parser_id": "naive",
},
)
for i in range(count)
]
responses = list(as_completed(futures))
assert len(responses) == count, responses
assert all(future.result()["code"] == 0 for future in futures)
class TestDatasetUpdate:
@pytest.mark.p3
def test_dataset_id_not_uuid(self, WebApiAuth):
payload = {"name": "not uuid", "description": "", "parser_id": "naive", "kb_id": "not_uuid"}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 109, res
assert "No authorization." in res["message"], res
@pytest.mark.p1
@given(name=valid_names())
@example("a" * 128)
@settings(max_examples=20, suppress_health_check=[HealthCheck.function_scoped_fixture])
def test_name(self, WebApiAuth, add_dataset_func, name):
dataset_id = add_dataset_func
payload = {"name": name, "description": "", "parser_id": "naive", "kb_id": dataset_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["name"] == name, res
@pytest.mark.p2
@pytest.mark.parametrize(
"name, expected_message",
[
("", "Dataset name can't be empty."),
(" ", "Dataset name can't be empty."),
("a" * (DATASET_NAME_LIMIT + 1), "Dataset name length is 129 which is large than 128"),
(0, "Dataset name must be string."),
(None, "Dataset name must be string."),
],
ids=["empty_name", "space_name", "too_long_name", "invalid_name", "None_name"],
)
def test_name_invalid(self, WebApiAuth, add_dataset_func, name, expected_message):
kb_id = add_dataset_func
payload = {"name": name, "description": "", "parser_id": "naive", "kb_id": kb_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 102, res
assert expected_message in res["message"], res
@pytest.mark.p3
def test_name_duplicated(self, WebApiAuth, add_datasets_func):
kb_id = add_datasets_func[0]
name = "kb_1"
payload = {"name": name, "description": "", "parser_id": "naive", "kb_id": kb_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 102, res
assert res["message"] == "Duplicated knowledgebase name.", res
@pytest.mark.p3
def test_name_case_insensitive(self, WebApiAuth, add_datasets_func):
kb_id = add_datasets_func[0]
name = "KB_1"
payload = {"name": name, "description": "", "parser_id": "naive", "kb_id": kb_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 102, res
assert res["message"] == "Duplicated knowledgebase name.", res
@pytest.mark.p2
def test_avatar(self, WebApiAuth, add_dataset_func, tmp_path):
kb_id = add_dataset_func
fn = create_image_file(tmp_path / "ragflow_test.png")
payload = {
"name": "avatar",
"description": "",
"parser_id": "naive",
"kb_id": kb_id,
"avatar": f"data:image/png;base64,{encode_avatar(fn)}",
}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["avatar"] == f"data:image/png;base64,{encode_avatar(fn)}", res
@pytest.mark.p2
def test_description(self, WebApiAuth, add_dataset_func):
kb_id = add_dataset_func
payload = {"name": "description", "description": "description", "parser_id": "naive", "kb_id": kb_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["description"] == "description", res
@pytest.mark.p1
@pytest.mark.parametrize(
"embedding_model",
[
"BAAI/bge-large-zh-v1.5@BAAI",
"maidalun1020/bce-embedding-base_v1@Youdao",
"embedding-3@ZHIPU-AI",
],
ids=["builtin_baai", "builtin_youdao", "tenant_zhipu"],
)
def test_embedding_model(self, WebApiAuth, add_dataset_func, embedding_model):
kb_id = add_dataset_func
payload = {"name": "embedding_model", "description": "", "parser_id": "naive", "kb_id": kb_id, "embd_id": embedding_model}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["embd_id"] == embedding_model, res
@pytest.mark.p1
@pytest.mark.parametrize(
"permission",
[
"me",
"team",
],
ids=["me", "team"],
)
def test_permission(self, WebApiAuth, add_dataset_func, permission):
kb_id = add_dataset_func
payload = {"name": "permission", "description": "", "parser_id": "naive", "kb_id": kb_id, "permission": permission}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["permission"] == permission.lower().strip(), res
@pytest.mark.p1
@pytest.mark.parametrize(
"chunk_method",
[
"naive",
"book",
"email",
"laws",
"manual",
"one",
"paper",
"picture",
"presentation",
"qa",
"table",
pytest.param("tag", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="Infinity does not support parser_id=tag")),
],
ids=["naive", "book", "email", "laws", "manual", "one", "paper", "picture", "presentation", "qa", "table", "tag"],
)
def test_chunk_method(self, WebApiAuth, add_dataset_func, chunk_method):
kb_id = add_dataset_func
payload = {"name": "chunk_method", "description": "", "parser_id": chunk_method, "kb_id": kb_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["parser_id"] == chunk_method, res
@pytest.mark.p1
@pytest.mark.skipif(os.getenv("DOC_ENGINE") != "infinity", reason="Infinity does not support parser_id=tag")
def test_chunk_method_tag_with_infinity(self, WebApiAuth, add_dataset_func):
kb_id = add_dataset_func
payload = {"name": "chunk_method", "description": "", "parser_id": "tag", "kb_id": kb_id}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 103, res
assert res["message"] == "The chunking method Tag has not been supported by Infinity yet.", res
@pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="#8208")
@pytest.mark.p2
@pytest.mark.parametrize("pagerank", [0, 50, 100], ids=["min", "mid", "max"])
def test_pagerank(self, WebApiAuth, add_dataset_func, pagerank):
kb_id = add_dataset_func
payload = {"name": "pagerank", "description": "", "parser_id": "naive", "kb_id": kb_id, "pagerank": pagerank}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["pagerank"] == pagerank, res
@pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="#8208")
@pytest.mark.p2
def test_pagerank_set_to_0(self, WebApiAuth, add_dataset_func):
kb_id = add_dataset_func
payload = {"name": "pagerank", "description": "", "parser_id": "naive", "kb_id": kb_id, "pagerank": 50}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["pagerank"] == 50, res
payload = {"name": "pagerank", "description": "", "parser_id": "naive", "kb_id": kb_id, "pagerank": 0}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["pagerank"] == 0, res
@pytest.mark.skipif(os.getenv("DOC_ENGINE") != "infinity", reason="#8208")
@pytest.mark.p2
def test_pagerank_infinity(self, WebApiAuth, add_dataset_func):
kb_id = add_dataset_func
payload = {"name": "pagerank", "description": "", "parser_id": "naive", "kb_id": kb_id, "pagerank": 50}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 102, res
assert res["message"] == "'pagerank' can only be set when doc_engine is elasticsearch", res
@pytest.mark.p1
@pytest.mark.parametrize(
"parser_config",
[
{"auto_keywords": 0},
{"auto_keywords": 16},
{"auto_keywords": 32},
{"auto_questions": 0},
{"auto_questions": 5},
{"auto_questions": 10},
{"chunk_token_num": 1},
{"chunk_token_num": 1024},
{"chunk_token_num": 2048},
{"delimiter": "\n"},
{"delimiter": " "},
{"html4excel": True},
{"html4excel": False},
{"layout_recognize": "DeepDOC"},
{"layout_recognize": "Plain Text"},
{"tag_kb_ids": ["1", "2"]},
{"topn_tags": 1},
{"topn_tags": 5},
{"topn_tags": 10},
{"filename_embd_weight": 0.1},
{"filename_embd_weight": 0.5},
{"filename_embd_weight": 1.0},
{"task_page_size": 1},
{"task_page_size": None},
{"pages": [[1, 100]]},
{"pages": None},
{"graphrag": {"use_graphrag": True}},
{"graphrag": {"use_graphrag": False}},
{"graphrag": {"entity_types": ["age", "sex", "height", "weight"]}},
{"graphrag": {"method": "general"}},
{"graphrag": {"method": "light"}},
{"graphrag": {"community": True}},
{"graphrag": {"community": False}},
{"graphrag": {"resolution": True}},
{"graphrag": {"resolution": False}},
{"raptor": {"use_raptor": True}},
{"raptor": {"use_raptor": False}},
{"raptor": {"prompt": "Who are you?"}},
{"raptor": {"max_token": 1}},
{"raptor": {"max_token": 1024}},
{"raptor": {"max_token": 2048}},
{"raptor": {"threshold": 0.0}},
{"raptor": {"threshold": 0.5}},
{"raptor": {"threshold": 1.0}},
{"raptor": {"max_cluster": 1}},
{"raptor": {"max_cluster": 512}},
{"raptor": {"max_cluster": 1024}},
{"raptor": {"random_seed": 0}},
],
ids=[
"auto_keywords_min",
"auto_keywords_mid",
"auto_keywords_max",
"auto_questions_min",
"auto_questions_mid",
"auto_questions_max",
"chunk_token_num_min",
"chunk_token_num_mid",
"chunk_token_num_max",
"delimiter",
"delimiter_space",
"html4excel_true",
"html4excel_false",
"layout_recognize_DeepDOC",
"layout_recognize_navie",
"tag_kb_ids",
"topn_tags_min",
"topn_tags_mid",
"topn_tags_max",
"filename_embd_weight_min",
"filename_embd_weight_mid",
"filename_embd_weight_max",
"task_page_size_min",
"task_page_size_None",
"pages",
"pages_none",
"graphrag_true",
"graphrag_false",
"graphrag_entity_types",
"graphrag_method_general",
"graphrag_method_light",
"graphrag_community_true",
"graphrag_community_false",
"graphrag_resolution_true",
"graphrag_resolution_false",
"raptor_true",
"raptor_false",
"raptor_prompt",
"raptor_max_token_min",
"raptor_max_token_mid",
"raptor_max_token_max",
"raptor_threshold_min",
"raptor_threshold_mid",
"raptor_threshold_max",
"raptor_max_cluster_min",
"raptor_max_cluster_mid",
"raptor_max_cluster_max",
"raptor_random_seed_min",
],
)
def test_parser_config(self, WebApiAuth, add_dataset_func, parser_config):
kb_id = add_dataset_func
payload = {"name": "parser_config", "description": "", "parser_id": "naive", "kb_id": kb_id, "parser_config": parser_config}
res = update_kb(WebApiAuth, payload)
assert res["code"] == 0, res
assert res["data"]["parser_config"] == parser_config, res
@pytest.mark.p2
@pytest.mark.parametrize(
"payload",
[
{"id": "id"},
{"tenant_id": "e57c1966f99211efb41e9e45646e0111"},
{"created_by": "created_by"},
{"create_date": "Tue, 11 Mar 2025 13:37:23 GMT"},
{"create_time": 1741671443322},
{"update_date": "Tue, 11 Mar 2025 13:37:23 GMT"},
{"update_time": 1741671443339},
],
)
def test_field_unsupported(self, WebApiAuth, add_dataset_func, payload):
kb_id = add_dataset_func
full_payload = {"name": "field_unsupported", "description": "", "parser_id": "naive", "kb_id": kb_id, **payload}
res = update_kb(WebApiAuth, full_payload)
assert res["code"] == 101, res
assert "isn't allowed" in res["message"], res