LightRAG/lightrag/base.py

256 lines
8.0 KiB
Python
Raw Normal View History

import os
2024-10-10 15:02:30 +08:00
from dataclasses import dataclass, field
2025-02-09 19:51:05 +01:00
from enum import Enum
2025-01-25 18:54:40 +08:00
from typing import (
2025-02-09 19:51:05 +01:00
Any,
Literal,
2025-02-09 11:24:08 +01:00
Optional,
2025-01-25 18:54:40 +08:00
TypedDict,
TypeVar,
2025-02-09 19:51:05 +01:00
Union,
2025-01-25 18:54:40 +08:00
)
2024-10-10 15:02:30 +08:00
2025-02-09 11:24:08 +01:00
import numpy as np
2024-10-10 15:02:30 +08:00
from .utils import EmbeddingFunc
from .types import KnowledgeGraph
2024-10-10 15:02:30 +08:00
2025-02-09 19:51:05 +01:00
class TextChunkSchema(TypedDict):
tokens: int
content: str
full_doc_id: str
chunk_order_index: int
2024-10-10 15:02:30 +08:00
T = TypeVar("T")
2024-10-10 15:02:30 +08:00
@dataclass
class QueryParam:
2025-02-09 00:13:26 +01:00
"""Configuration parameters for query execution in LightRAG."""
mode: Literal["local", "global", "hybrid", "naive", "mix"] = "global"
2025-02-09 00:13:26 +01:00
"""Specifies the retrieval mode:
- "local": Focuses on context-dependent information.
- "global": Utilizes global knowledge.
- "hybrid": Combines local and global retrieval methods.
- "naive": Performs a basic search without advanced techniques.
- "mix": Integrates knowledge graph and vector retrieval.
"""
2024-10-10 15:02:30 +08:00
only_need_context: bool = False
2025-02-09 00:13:26 +01:00
"""If True, only returns the retrieved context without generating a response."""
only_need_prompt: bool = False
2025-02-09 00:13:26 +01:00
"""If True, only returns the generated prompt without producing a response."""
2024-10-10 15:02:30 +08:00
response_type: str = "Multiple Paragraphs"
2025-02-09 00:13:26 +01:00
"""Defines the response format. Examples: 'Multiple Paragraphs', 'Single Paragraph', 'Bullet Points'."""
stream: bool = False
2025-02-09 00:13:26 +01:00
"""If True, enables streaming output for real-time responses."""
top_k: int = int(os.getenv("TOP_K", "60"))
2025-02-09 00:13:26 +01:00
"""Number of top items to retrieve. Represents entities in 'local' mode and relationships in 'global' mode."""
max_token_for_text_unit: int = 4000
2025-02-09 00:13:26 +01:00
"""Maximum number of tokens allowed for each retrieved text chunk."""
2024-10-10 15:02:30 +08:00
max_token_for_global_context: int = 4000
2025-02-09 00:13:26 +01:00
"""Maximum number of tokens allocated for relationship descriptions in global retrieval."""
max_token_for_local_context: int = 4000
2025-02-09 00:13:26 +01:00
"""Maximum number of tokens allocated for entity descriptions in local retrieval."""
2025-02-09 18:03:34 +01:00
hl_keywords: list[str] = field(default_factory=list)
2025-02-09 00:13:26 +01:00
"""List of high-level keywords to prioritize in retrieval."""
2025-02-09 18:03:34 +01:00
ll_keywords: list[str] = field(default_factory=list)
2025-02-09 00:13:26 +01:00
"""List of low-level keywords to refine retrieval focus."""
2025-02-09 18:03:34 +01:00
conversation_history: list[dict[str, Any]] = field(default_factory=list)
2025-02-09 00:13:26 +01:00
"""Stores past conversation history to maintain context.
Format: [{"role": "user/assistant", "content": "message"}].
"""
2024-10-10 15:02:30 +08:00
2025-02-09 00:13:26 +01:00
history_turns: int = 3
"""Number of complete conversation turns (user-assistant pairs) to consider in the response context."""
2024-10-10 15:02:30 +08:00
2025-02-09 18:03:34 +01:00
2024-10-10 15:02:30 +08:00
@dataclass
class StorageNameSpace:
namespace: str
2025-02-08 22:57:37 +01:00
global_config: dict[str, Any]
2024-10-10 15:02:30 +08:00
async def index_done_callback(self) -> None:
2025-02-09 19:51:05 +01:00
"""Commit the storage operations after indexing"""
2024-10-10 15:02:30 +08:00
pass
async def query_done_callback(self) -> None:
2025-02-09 19:51:05 +01:00
"""Commit the storage operations after querying"""
2024-10-10 15:02:30 +08:00
pass
2024-10-10 15:02:30 +08:00
@dataclass
class BaseVectorStorage(StorageNameSpace):
embedding_func: EmbeddingFunc
2025-02-14 23:42:52 +01:00
meta_fields: set[str] = field(default_factory=set)
2025-02-14 23:49:39 +01:00
2025-02-09 11:00:04 +01:00
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-09 11:00:04 +01:00
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
2024-10-10 15:02:30 +08:00
"""Use 'content' field from value for embedding, use key as id.
If embedding_func is None, use 'embedding' field from value
"""
raise NotImplementedError
2024-10-10 15:02:30 +08:00
@dataclass
2025-02-08 23:16:04 +01:00
class BaseKVStorage(StorageNameSpace):
embedding_func: EmbeddingFunc
2024-11-12 13:32:40 +08:00
2025-02-09 19:51:05 +01:00
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-09 10:32:59 +01:00
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-09 19:21:49 +01:00
async def filter_keys(self, data: set[str]) -> set[str]:
2025-02-09 19:51:05 +01:00
"""Return un-exist keys"""
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-08 23:16:04 +01:00
async def upsert(self, data: dict[str, Any]) -> None:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-08 22:57:37 +01:00
async def drop(self) -> None:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-08 23:58:15 +01:00
2024-10-10 15:02:30 +08:00
@dataclass
class BaseGraphStorage(StorageNameSpace):
2025-02-14 23:42:52 +01:00
embedding_func: EmbeddingFunc | None = None
"""Check if a node exists in the graph."""
2024-10-10 15:02:30 +08:00
async def has_node(self, node_id: str) -> bool:
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Check if an edge exists in the graph."""
2024-10-10 15:02:30 +08:00
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Get the degree of a node."""
2024-10-10 15:02:30 +08:00
async def node_degree(self, node_id: str) -> int:
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Get the degree of an edge."""
2024-10-10 15:02:30 +08:00
async def edge_degree(self, src_id: str, tgt_id: str) -> int:
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Get a node by its id."""
async def get_node(self, node_id: str) -> Union[dict[str, str], None]:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Get an edge by its source and target node ids."""
2024-10-10 15:02:30 +08:00
async def get_edge(
self, source_node_id: str, target_node_id: str
2025-02-14 23:42:52 +01:00
) -> Union[dict[str, str], None]:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Get all edges connected to a node."""
2024-10-10 15:02:30 +08:00
async def get_node_edges(
self, source_node_id: str
) -> Union[list[tuple[str, str]], None]:
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Upsert a node into the graph."""
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Upsert an edge into the graph."""
2024-10-10 15:02:30 +08:00
async def upsert_edge(
2025-02-14 23:42:52 +01:00
self, source_node_id: str,
target_node_id: str,
edge_data: dict[str, str]
) -> None:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Delete a node from the graph."""
async def delete_node(self, node_id: str) -> None:
2024-10-10 15:02:30 +08:00
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Embed nodes using an algorithm."""
async def embed_nodes(self, algorithm: str) -> tuple[np.ndarray[Any, Any], list[str]]:
raise NotImplementedError("Node embedding is not used in lightrag.")
2025-02-14 23:42:52 +01:00
"""Get all labels in the graph."""
2025-02-09 11:24:08 +01:00
async def get_all_labels(self) -> list[str]:
2025-01-25 18:54:40 +08:00
raise NotImplementedError
2025-02-14 23:42:52 +01:00
"""Get a knowledge graph of a node."""
2025-01-25 18:54:40 +08:00
async def get_knowledge_graph(
self, node_label: str, max_depth: int = 5
) -> KnowledgeGraph:
2025-01-25 18:54:40 +08:00
raise NotImplementedError
class DocStatus(str, Enum):
"""Document processing status enum"""
PENDING = "pending"
PROCESSING = "processing"
PROCESSED = "processed"
FAILED = "failed"
@dataclass
class DocProcessingStatus:
"""Document processing status data structure"""
2025-02-09 15:25:58 +01:00
2025-02-09 15:24:37 +01:00
content: str
"""Original content of the document"""
content_summary: str
"""First 100 chars of document content, used for preview"""
content_length: int
"""Total length of document"""
status: DocStatus
"""Current processing status"""
created_at: str
"""ISO format timestamp when document was created"""
updated_at: str
"""ISO format timestamp when document was last updated"""
chunks_count: Optional[int] = None
"""Number of chunks after splitting, used for processing"""
error: Optional[str] = None
"""Error message if failed"""
metadata: dict[str, Any] = field(default_factory=dict)
"""Additional metadata"""
class DocStatusStorage(BaseKVStorage):
"""Base class for document status storage"""
2025-02-09 11:24:08 +01:00
async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status"""
raise NotImplementedError
2025-02-09 11:24:08 +01:00
async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all failed documents"""
raise NotImplementedError
2025-02-09 11:24:08 +01:00
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all pending documents"""
raise NotImplementedError
async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all processing documents"""
raise NotImplementedError
async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all procesed documents"""
raise NotImplementedError
async def update_doc_status(self, data: dict[str, Any]) -> None:
"""Updates the status of a document. By default, it calls upsert."""
await self.upsert(data)