This commit is contained in:
zrguo 2025-06-05 17:37:11 +08:00
parent 962974589a
commit cc9040d70c
8 changed files with 673 additions and 517 deletions

View File

@ -257,7 +257,7 @@ The processors support different types of content:
- `ImageModalProcessor`: Processes images with captions and footnotes
- `TableModalProcessor`: Processes tables with captions and footnotes
- `EquationModalProcessor`: Processes mathematical equations in LaTeX format
- `GenericModalProcessor`: A base processor that can be extended for custom content types
- `GenericModalProcessor`: A base processor that can be extended for custom content types
> **Note**: A complete working example can be found in `examples/modalprocessors_example.py`. You can run it using:
> ```bash
@ -357,4 +357,4 @@ description, entity_info = await equation_processor.process_multimodal_content(
)
```
</details>
</details>

View File

@ -256,7 +256,7 @@ MinerU 配置文件 `magic-pdf.json` 支持多种自定义选项,包括:
- `ImageModalProcessor`:处理带有标题和脚注的图像
- `TableModalProcessor`:处理带有标题和脚注的表格
- `EquationModalProcessor`:处理 LaTeX 格式的数学公式
- `GenericModalProcessor`:可用于扩展自定义内容类型的基础处理器
- `GenericModalProcessor`:可用于扩展自定义内容类型的基础处理器
> **注意**:完整的可运行示例可以在 `examples/modalprocessors_example.py` 中找到。您可以使用以下命令运行它:
> ```bash
@ -355,4 +355,4 @@ description, entity_info = await equation_processor.process_multimodal_content(
entity_name="质能方程"
)
```
</details>
</details>

View File

@ -10,13 +10,15 @@ This example shows how to:
import os
import argparse
from pathlib import Path
from lightrag.mineru_parser import MineruParser
def parse_document(file_path: str, output_dir: str = None, method: str = "auto", stats: bool = False):
def parse_document(
file_path: str, output_dir: str = None, method: str = "auto", stats: bool = False
):
"""
Parse a document using MinerU parser
Args:
file_path: Path to the document
output_dir: Output directory for parsed results
@ -26,22 +28,20 @@ def parse_document(file_path: str, output_dir: str = None, method: str = "auto",
try:
# Parse the document
content_list, md_content = MineruParser.parse_document(
file_path=file_path,
parse_method=method,
output_dir=output_dir
file_path=file_path, parse_method=method, output_dir=output_dir
)
# Display statistics if requested
if stats:
print("\nDocument Statistics:")
print(f"Total content blocks: {len(content_list)}")
# Count different types of content
content_types = {}
for item in content_list:
content_type = item.get('type', 'unknown')
content_type = item.get("type", "unknown")
content_types[content_type] = content_types.get(content_type, 0) + 1
print("\nContent Type Distribution:")
for content_type, count in content_types.items():
print(f"- {content_type}: {count}")
@ -52,17 +52,22 @@ def parse_document(file_path: str, output_dir: str = None, method: str = "auto",
print(f"Error parsing document: {str(e)}")
return None, None
def main():
"""Main function to run the example"""
parser = argparse.ArgumentParser(description='MinerU Parser Example')
parser.add_argument('file_path', help='Path to the document to parse')
parser.add_argument('--output', '-o', help='Output directory path')
parser.add_argument('--method', '-m',
choices=['auto', 'ocr', 'txt'],
default='auto',
help='Parsing method (auto, ocr, txt)')
parser.add_argument('--stats', action='store_true',
help='Display content statistics')
parser = argparse.ArgumentParser(description="MinerU Parser Example")
parser.add_argument("file_path", help="Path to the document to parse")
parser.add_argument("--output", "-o", help="Output directory path")
parser.add_argument(
"--method",
"-m",
choices=["auto", "ocr", "txt"],
default="auto",
help="Parsing method (auto, ocr, txt)",
)
parser.add_argument(
"--stats", action="store_true", help="Display content statistics"
)
args = parser.parse_args()
@ -72,11 +77,9 @@ def main():
# Parse document
content_list, md_content = parse_document(
args.file_path,
args.output,
args.method,
args.stats
args.file_path, args.output, args.method, args.stats
)
if __name__ == '__main__':
main()
if __name__ == "__main__":
main()

View File

@ -8,94 +8,112 @@ import asyncio
import argparse
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.kg.shared_storage import initialize_pipeline_status
from pathlib import Path
from lightrag import LightRAG
from lightrag.modalprocessors import (
ImageModalProcessor,
TableModalProcessor,
EquationModalProcessor,
GenericModalProcessor
)
WORKING_DIR = "./rag_storage"
def get_llm_model_func(api_key: str, base_url: str = None):
return lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
def get_vision_model_func(api_key: str, base_url: str = None):
return lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache(
"gpt-4o",
"",
def get_llm_model_func(api_key: str, base_url: str = None):
return (
lambda prompt,
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt} if system_prompt else None,
{"role": "user", "content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
}
]} if image_data else {"role": "user", "content": prompt}
],
api_key=api_key,
base_url=base_url,
**kwargs,
) if image_data else openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
**kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
)
def get_vision_model_func(api_key: str, base_url: str = None):
return (
lambda prompt,
system_prompt=None,
history_messages=[],
image_data=None,
**kwargs: openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt} if system_prompt else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
},
},
],
}
if image_data
else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
)
if image_data
else openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)
)
async def process_image_example(lightrag: LightRAG, vision_model_func):
"""Example of processing an image"""
# Create image processor
image_processor = ImageModalProcessor(
lightrag=lightrag,
modal_caption_func=vision_model_func
lightrag=lightrag, modal_caption_func=vision_model_func
)
# Prepare image content
image_content = {
"img_path": "image.jpg",
"img_caption": ["Example image caption"],
"img_footnote": ["Example image footnote"]
"img_footnote": ["Example image footnote"],
}
# Process image
description, entity_info = await image_processor.process_multimodal_content(
modal_content=image_content,
content_type="image",
file_path="image_example.jpg",
entity_name="Example Image"
entity_name="Example Image",
)
print("Image Processing Results:")
print(f"Description: {description}")
print(f"Entity Info: {entity_info}")
async def process_table_example(lightrag: LightRAG, llm_model_func):
"""Example of processing a table"""
# Create table processor
table_processor = TableModalProcessor(
lightrag=lightrag,
modal_caption_func=llm_model_func
lightrag=lightrag, modal_caption_func=llm_model_func
)
# Prepare table content
table_content = {
"table_body": """
@ -105,47 +123,45 @@ async def process_table_example(lightrag: LightRAG, llm_model_func):
| Mary | 30 | Designer |
""",
"table_caption": ["Employee Information Table"],
"table_footnote": ["Data updated as of 2024"]
"table_footnote": ["Data updated as of 2024"],
}
# Process table
description, entity_info = await table_processor.process_multimodal_content(
modal_content=table_content,
content_type="table",
file_path="table_example.md",
entity_name="Employee Table"
entity_name="Employee Table",
)
print("\nTable Processing Results:")
print(f"Description: {description}")
print(f"Entity Info: {entity_info}")
async def process_equation_example(lightrag: LightRAG, llm_model_func):
"""Example of processing a mathematical equation"""
# Create equation processor
equation_processor = EquationModalProcessor(
lightrag=lightrag,
modal_caption_func=llm_model_func
lightrag=lightrag, modal_caption_func=llm_model_func
)
# Prepare equation content
equation_content = {
"text": "E = mc^2",
"text_format": "LaTeX"
}
equation_content = {"text": "E = mc^2", "text_format": "LaTeX"}
# Process equation
description, entity_info = await equation_processor.process_multimodal_content(
modal_content=equation_content,
content_type="equation",
file_path="equation_example.txt",
entity_name="Mass-Energy Equivalence"
entity_name="Mass-Energy Equivalence",
)
print("\nEquation Processing Results:")
print(f"Description: {description}")
print(f"Entity Info: {entity_info}")
async def initialize_rag(api_key: str, base_url: str = None):
rag = LightRAG(
working_dir=WORKING_DIR,
@ -155,7 +171,10 @@ async def initialize_rag(api_key: str, base_url: str = None):
api_key=api_key,
base_url=base_url,
),
llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache(
llm_model_func=lambda prompt,
system_prompt=None,
history_messages=[],
**kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
@ -171,30 +190,35 @@ async def initialize_rag(api_key: str, base_url: str = None):
return rag
def main():
"""Main function to run the example"""
parser = argparse.ArgumentParser(description='Modal Processors Example')
parser.add_argument('--api-key', required=True, help='OpenAI API key')
parser.add_argument('--base-url', help='Optional base URL for API')
parser.add_argument('--working-dir', '-w', default=WORKING_DIR, help='Working directory path')
parser = argparse.ArgumentParser(description="Modal Processors Example")
parser.add_argument("--api-key", required=True, help="OpenAI API key")
parser.add_argument("--base-url", help="Optional base URL for API")
parser.add_argument(
"--working-dir", "-w", default=WORKING_DIR, help="Working directory path"
)
args = parser.parse_args()
# Run examples
asyncio.run(main_async(args.api_key, args.base_url))
async def main_async(api_key: str, base_url: str = None):
# Initialize LightRAG
lightrag = await initialize_rag(api_key, base_url)
# Get model functions
llm_model_func = get_llm_model_func(api_key, base_url)
vision_model_func = get_vision_model_func(api_key, base_url)
# Run examples
await process_image_example(lightrag, vision_model_func)
await process_table_example(lightrag, llm_model_func)
await process_equation_example(lightrag, llm_model_func)
if __name__ == "__main__":
main()
main()

View File

@ -11,15 +11,20 @@ This example shows how to:
import os
import argparse
import asyncio
from pathlib import Path
from lightrag.mineru_parser import MineruParser
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.raganything import RAGAnything
async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_url: str = None, working_dir: str = None):
async def process_with_rag(
file_path: str,
output_dir: str,
api_key: str,
base_url: str = None,
working_dir: str = None,
):
"""
Process document with RAGAnything
Args:
file_path: Path to the document
output_dir: Output directory for RAG results
@ -30,7 +35,10 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u
# Initialize RAGAnything
rag = RAGAnything(
working_dir=working_dir,
llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache(
llm_model_func=lambda prompt,
system_prompt=None,
history_messages=[],
**kwargs: openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
@ -39,27 +47,40 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u
base_url=base_url,
**kwargs,
),
vision_model_func=lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache(
vision_model_func=lambda prompt,
system_prompt=None,
history_messages=[],
image_data=None,
**kwargs: openai_complete_if_cache(
"gpt-4o",
"",
system_prompt=None,
history_messages=[],
messages=[
{"role": "system", "content": system_prompt} if system_prompt else None,
{"role": "user", "content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
}
]} if image_data else {"role": "user", "content": prompt}
{"role": "system", "content": system_prompt}
if system_prompt
else None,
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
},
},
],
}
if image_data
else {"role": "user", "content": prompt},
],
api_key=api_key,
base_url=base_url,
**kwargs,
) if image_data else openai_complete_if_cache(
)
if image_data
else openai_complete_if_cache(
"gpt-4o-mini",
prompt,
system_prompt=system_prompt,
@ -75,21 +96,19 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u
base_url=base_url,
),
embedding_dim=3072,
max_token_size=8192
max_token_size=8192,
)
# Process document
await rag.process_document_complete(
file_path=file_path,
output_dir=output_dir,
parse_method="auto"
file_path=file_path, output_dir=output_dir, parse_method="auto"
)
# Example queries
queries = [
"What is the main content of the document?",
"Describe the images and figures in the document",
"Tell me about the experimental results and data tables"
"Tell me about the experimental results and data tables",
]
print("\nQuerying processed document:")
@ -101,14 +120,21 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u
except Exception as e:
print(f"Error processing with RAG: {str(e)}")
def main():
"""Main function to run the example"""
parser = argparse.ArgumentParser(description='MinerU RAG Example')
parser.add_argument('file_path', help='Path to the document to process')
parser.add_argument('--working_dir', '-w', default="./rag_storage", help='Working directory path')
parser.add_argument('--output', '-o', default="./output", help='Output directory path')
parser.add_argument('--api-key', required=True, help='OpenAI API key for RAG processing')
parser.add_argument('--base-url', help='Optional base URL for API')
parser = argparse.ArgumentParser(description="MinerU RAG Example")
parser.add_argument("file_path", help="Path to the document to process")
parser.add_argument(
"--working_dir", "-w", default="./rag_storage", help="Working directory path"
)
parser.add_argument(
"--output", "-o", default="./output", help="Output directory path"
)
parser.add_argument(
"--api-key", required=True, help="OpenAI API key for RAG processing"
)
parser.add_argument("--base-url", help="Optional base URL for API")
args = parser.parse_args()
@ -117,13 +143,12 @@ def main():
os.makedirs(args.output, exist_ok=True)
# Process with RAG
asyncio.run(process_with_rag(
args.file_path,
args.output,
args.api_key,
args.base_url,
args.working_dir
))
asyncio.run(
process_with_rag(
args.file_path, args.output, args.api_key, args.base_url, args.working_dir
)
)
if __name__ == '__main__':
main()
if __name__ == "__main__":
main()

View File

@ -1,4 +1,4 @@
# type: ignore
# type: ignore
"""
MinerU Document Parser Utility
@ -14,7 +14,18 @@ import os
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Union, Tuple, Any, TypeVar, cast, TYPE_CHECKING, ClassVar
from typing import (
Dict,
List,
Optional,
Union,
Tuple,
Any,
TypeVar,
cast,
TYPE_CHECKING,
ClassVar,
)
# Type stubs for magic_pdf
FileBasedDataWriter = Any
@ -28,20 +39,27 @@ read_local_office = Any
read_local_images = Any
if TYPE_CHECKING:
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.data_reader_writer import (
FileBasedDataWriter,
FileBasedDataReader,
)
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.read_api import read_local_office, read_local_images
else:
# MinerU imports
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.data_reader_writer import (
FileBasedDataWriter,
FileBasedDataReader,
)
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.read_api import read_local_office, read_local_images
T = TypeVar('T')
T = TypeVar("T")
class MineruParser:
"""
@ -58,7 +76,11 @@ class MineruParser:
pass
@staticmethod
def safe_write(writer: Any, content: Union[str, bytes, Dict[str, Any], List[Any]], filename: str) -> None:
def safe_write(
writer: Any,
content: Union[str, bytes, Dict[str, Any], List[Any]],
filename: str,
) -> None:
"""
Safely write content to a file, ensuring the filename is valid
@ -80,15 +102,22 @@ class MineruParser:
writer.write(content, filename)
except TypeError:
# If the writer expects bytes, convert string to bytes
writer.write(content.encode('utf-8'), filename)
writer.write(content.encode("utf-8"), filename)
else:
# For dict/list content, always encode as JSON string first
if isinstance(content, (dict, list)):
try:
writer.write(json.dumps(content, ensure_ascii=False, indent=4), filename)
writer.write(
json.dumps(content, ensure_ascii=False, indent=4), filename
)
except TypeError:
# If the writer expects bytes, convert JSON string to bytes
writer.write(json.dumps(content, ensure_ascii=False, indent=4).encode('utf-8'), filename)
writer.write(
json.dumps(content, ensure_ascii=False, indent=4).encode(
"utf-8"
),
filename,
)
else:
# Regular content (assumed to be bytes or compatible)
writer.write(content, filename)
@ -97,7 +126,7 @@ class MineruParser:
def parse_pdf(
pdf_path: Union[str, Path],
output_dir: Optional[str] = None,
use_ocr: bool = False
use_ocr: bool = False,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse PDF document
@ -150,9 +179,15 @@ class MineruParser:
# Draw visualizations
try:
infer_result.draw_model(os.path.join(local_md_dir, f"{name_without_suff}_model.pdf")) # type: ignore
pipe_result.draw_layout(os.path.join(local_md_dir, f"{name_without_suff}_layout.pdf")) # type: ignore
pipe_result.draw_span(os.path.join(local_md_dir, f"{name_without_suff}_spans.pdf")) # type: ignore
infer_result.draw_model(
os.path.join(local_md_dir, f"{name_without_suff}_model.pdf")
) # type: ignore
pipe_result.draw_layout(
os.path.join(local_md_dir, f"{name_without_suff}_layout.pdf")
) # type: ignore
pipe_result.draw_span(
os.path.join(local_md_dir, f"{name_without_suff}_spans.pdf")
) # type: ignore
except Exception as e:
print(f"Warning: Failed to draw visualizations: {str(e)}")
@ -162,7 +197,9 @@ class MineruParser:
# Save files using dump methods (consistent with API)
pipe_result.dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore
pipe_result.dump_content_list(
md_writer, f"{name_without_suff}_content_list.json", image_dir
) # type: ignore
pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
# Save model result - convert JSON string to bytes before writing
@ -171,16 +208,24 @@ class MineruParser:
try:
# Try to write to a file manually to avoid FileBasedDataWriter issues
model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json")
with open(model_file_path, 'w', encoding='utf-8') as f:
model_file_path = os.path.join(
local_md_dir, f"{name_without_suff}_model.json"
)
with open(model_file_path, "w", encoding="utf-8") as f:
f.write(json_str)
except Exception as e:
print(f"Warning: Failed to save model result using file write: {str(e)}")
print(
f"Warning: Failed to save model result using file write: {str(e)}"
)
try:
# If direct file write fails, try using the writer with bytes encoding
md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore
md_writer.write(
json_str.encode("utf-8"), f"{name_without_suff}_model.json"
) # type: ignore
except Exception as e2:
print(f"Warning: Failed to save model result using writer: {str(e2)}")
print(
f"Warning: Failed to save model result using writer: {str(e2)}"
)
return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
@ -190,8 +235,7 @@ class MineruParser:
@staticmethod
def parse_office_doc(
doc_path: Union[str, Path],
output_dir: Optional[str] = None
doc_path: Union[str, Path], output_dir: Optional[str] = None
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse office document (Word, PPT, etc.)
@ -231,9 +275,9 @@ class MineruParser:
# Apply chain of operations according to API documentation
# This follows the pattern shown in MS-Office example in the API docs
ds.apply(doc_analyze, ocr=True)\
.pipe_txt_mode(image_writer)\
.dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer).dump_md(
md_writer, f"{name_without_suff}.md", image_dir
) # type: ignore
# Re-execute for getting the content data
infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
@ -244,7 +288,9 @@ class MineruParser:
content_list = pipe_result.get_content_list(image_dir) # type: ignore
# Save additional output files
pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore
pipe_result.dump_content_list(
md_writer, f"{name_without_suff}_content_list.json", image_dir
) # type: ignore
pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
# Save model result - convert JSON string to bytes before writing
@ -253,16 +299,24 @@ class MineruParser:
try:
# Try to write to a file manually to avoid FileBasedDataWriter issues
model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json")
with open(model_file_path, 'w', encoding='utf-8') as f:
model_file_path = os.path.join(
local_md_dir, f"{name_without_suff}_model.json"
)
with open(model_file_path, "w", encoding="utf-8") as f:
f.write(json_str)
except Exception as e:
print(f"Warning: Failed to save model result using file write: {str(e)}")
print(
f"Warning: Failed to save model result using file write: {str(e)}"
)
try:
# If direct file write fails, try using the writer with bytes encoding
md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore
md_writer.write(
json_str.encode("utf-8"), f"{name_without_suff}_model.json"
) # type: ignore
except Exception as e2:
print(f"Warning: Failed to save model result using writer: {str(e2)}")
print(
f"Warning: Failed to save model result using writer: {str(e2)}"
)
return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
@ -272,8 +326,7 @@ class MineruParser:
@staticmethod
def parse_image(
image_path: Union[str, Path],
output_dir: Optional[str] = None
image_path: Union[str, Path], output_dir: Optional[str] = None
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse image document
@ -313,9 +366,9 @@ class MineruParser:
# Apply chain of operations according to API documentation
# This follows the pattern shown in Image example in the API docs
ds.apply(doc_analyze, ocr=True)\
.pipe_ocr_mode(image_writer)\
.dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore
ds.apply(doc_analyze, ocr=True).pipe_ocr_mode(image_writer).dump_md(
md_writer, f"{name_without_suff}.md", image_dir
) # type: ignore
# Re-execute for getting the content data
infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore
@ -326,7 +379,9 @@ class MineruParser:
content_list = pipe_result.get_content_list(image_dir) # type: ignore
# Save additional output files
pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore
pipe_result.dump_content_list(
md_writer, f"{name_without_suff}_content_list.json", image_dir
) # type: ignore
pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore
# Save model result - convert JSON string to bytes before writing
@ -335,16 +390,24 @@ class MineruParser:
try:
# Try to write to a file manually to avoid FileBasedDataWriter issues
model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json")
with open(model_file_path, 'w', encoding='utf-8') as f:
model_file_path = os.path.join(
local_md_dir, f"{name_without_suff}_model.json"
)
with open(model_file_path, "w", encoding="utf-8") as f:
f.write(json_str)
except Exception as e:
print(f"Warning: Failed to save model result using file write: {str(e)}")
print(
f"Warning: Failed to save model result using file write: {str(e)}"
)
try:
# If direct file write fails, try using the writer with bytes encoding
md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore
md_writer.write(
json_str.encode("utf-8"), f"{name_without_suff}_model.json"
) # type: ignore
except Exception as e2:
print(f"Warning: Failed to save model result using writer: {str(e2)}")
print(
f"Warning: Failed to save model result using writer: {str(e2)}"
)
return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content))
@ -357,7 +420,7 @@ class MineruParser:
file_path: Union[str, Path],
parse_method: str = "auto",
output_dir: Optional[str] = None,
save_results: bool = True
save_results: bool = True,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse document using MinerU based on file extension
@ -382,64 +445,59 @@ class MineruParser:
# Choose appropriate parser based on file type
if ext in [".pdf"]:
return MineruParser.parse_pdf(
file_path,
output_dir,
use_ocr=(parse_method == "ocr")
file_path, output_dir, use_ocr=(parse_method == "ocr")
)
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]:
return MineruParser.parse_image(
file_path,
output_dir
)
return MineruParser.parse_image(file_path, output_dir)
elif ext in [".doc", ".docx", ".ppt", ".pptx"]:
return MineruParser.parse_office_doc(
file_path,
output_dir
)
return MineruParser.parse_office_doc(file_path, output_dir)
else:
# For unsupported file types, default to PDF parsing
print(f"Warning: Unsupported file extension '{ext}', trying generic PDF parser")
return MineruParser.parse_pdf(
file_path,
output_dir,
use_ocr=(parse_method == "ocr")
print(
f"Warning: Unsupported file extension '{ext}', trying generic PDF parser"
)
return MineruParser.parse_pdf(
file_path, output_dir, use_ocr=(parse_method == "ocr")
)
def main():
"""
Main function to run the MinerU parser from command line
"""
parser = argparse.ArgumentParser(description='Parse documents using MinerU')
parser.add_argument('file_path', help='Path to the document to parse')
parser.add_argument('--output', '-o', help='Output directory path')
parser.add_argument('--method', '-m',
choices=['auto', 'ocr', 'txt'],
default='auto',
help='Parsing method (auto, ocr, txt)')
parser.add_argument('--stats', action='store_true',
help='Display content statistics')
parser = argparse.ArgumentParser(description="Parse documents using MinerU")
parser.add_argument("file_path", help="Path to the document to parse")
parser.add_argument("--output", "-o", help="Output directory path")
parser.add_argument(
"--method",
"-m",
choices=["auto", "ocr", "txt"],
default="auto",
help="Parsing method (auto, ocr, txt)",
)
parser.add_argument(
"--stats", action="store_true", help="Display content statistics"
)
args = parser.parse_args()
try:
# Parse the document
content_list, md_content = MineruParser.parse_document(
file_path=args.file_path,
parse_method=args.method,
output_dir=args.output
file_path=args.file_path, parse_method=args.method, output_dir=args.output
)
# Display statistics if requested
if args.stats:
print("\nDocument Statistics:")
print(f"Total content blocks: {len(content_list)}")
# Count different types of content
content_types = {}
for item in content_list:
content_type = item.get('type', 'unknown')
content_type = item.get("type", "unknown")
content_types[content_type] = content_types.get(content_type, 0) + 1
print("\nContent Type Distribution:")
for content_type, count in content_types.items():
print(f"- {content_type}: {count}")
@ -450,5 +508,6 @@ def main():
return 0
if __name__ == '__main__':
if __name__ == "__main__":
exit(main())

View File

@ -31,7 +31,7 @@ class BaseModalProcessor:
def __init__(self, lightrag: LightRAG, modal_caption_func):
"""Initialize base processor
Args:
lightrag: LightRAG instance
modal_caption_func: Function for generating descriptions
@ -65,8 +65,8 @@ class BaseModalProcessor:
raise NotImplementedError("Subclasses must implement this method")
async def _create_entity_and_chunk(
self, modal_chunk: str, entity_info: Dict[str, Any],
file_path: str) -> Tuple[str, Dict[str, Any]]:
self, modal_chunk: str, entity_info: Dict[str, Any], file_path: str
) -> Tuple[str, Dict[str, Any]]:
"""Create entity and text chunk"""
# Create chunk
chunk_id = compute_mdhash_id(str(modal_chunk), prefix="chunk-")
@ -93,16 +93,16 @@ class BaseModalProcessor:
"created_at": int(time.time()),
}
await self.knowledge_graph_inst.upsert_node(entity_info["entity_name"],
node_data)
await self.knowledge_graph_inst.upsert_node(
entity_info["entity_name"], node_data
)
# Insert entity into vector database
entity_vdb_data = {
compute_mdhash_id(entity_info["entity_name"], prefix="ent-"): {
"entity_name": entity_info["entity_name"],
"entity_type": entity_info["entity_type"],
"content":
f"{entity_info['entity_name']}\n{entity_info['summary']}",
"content": f"{entity_info['entity_name']}\n{entity_info['summary']}",
"source_id": chunk_id,
"file_path": file_path,
}
@ -110,8 +110,7 @@ class BaseModalProcessor:
await self.entities_vdb.upsert(entity_vdb_data)
# Process entity and relationship extraction
await self._process_chunk_for_extraction(chunk_id,
entity_info["entity_name"])
await self._process_chunk_for_extraction(chunk_id, entity_info["entity_name"])
# Ensure all storage updates are complete
await self._insert_done()
@ -120,11 +119,12 @@ class BaseModalProcessor:
"entity_name": entity_info["entity_name"],
"entity_type": entity_info["entity_type"],
"description": entity_info["summary"],
"chunk_id": chunk_id
"chunk_id": chunk_id,
}
async def _process_chunk_for_extraction(self, chunk_id: str,
modal_entity_name: str):
async def _process_chunk_for_extraction(
self, chunk_id: str, modal_entity_name: str
):
"""Process chunk for entity and relationship extraction"""
chunk_data = await self.text_chunks_db.get_by_id(chunk_id)
if not chunk_data:
@ -168,37 +168,27 @@ class BaseModalProcessor:
if entity_name != modal_entity_name: # Skip self-relationship
# Create belongs_to relationship
relation_data = {
"description":
f"Entity {entity_name} belongs to {modal_entity_name}",
"keywords":
"belongs_to,part_of,contained_in",
"source_id":
chunk_id,
"weight":
10.0,
"file_path":
chunk_data.get("file_path", "manual_creation"),
"description": f"Entity {entity_name} belongs to {modal_entity_name}",
"keywords": "belongs_to,part_of,contained_in",
"source_id": chunk_id,
"weight": 10.0,
"file_path": chunk_data.get("file_path", "manual_creation"),
}
await self.knowledge_graph_inst.upsert_edge(
entity_name, modal_entity_name, relation_data)
entity_name, modal_entity_name, relation_data
)
relation_id = compute_mdhash_id(entity_name +
modal_entity_name,
prefix="rel-")
relation_id = compute_mdhash_id(
entity_name + modal_entity_name, prefix="rel-"
)
relation_vdb_data = {
relation_id: {
"src_id":
entity_name,
"tgt_id":
modal_entity_name,
"keywords":
relation_data["keywords"],
"content":
f"{relation_data['keywords']}\t{entity_name}\n{modal_entity_name}\n{relation_data['description']}",
"source_id":
chunk_id,
"file_path":
chunk_data.get("file_path", "manual_creation"),
"src_id": entity_name,
"tgt_id": modal_entity_name,
"keywords": relation_data["keywords"],
"content": f"{relation_data['keywords']}\t{entity_name}\n{modal_entity_name}\n{relation_data['description']}",
"source_id": chunk_id,
"file_path": chunk_data.get("file_path", "manual_creation"),
}
}
await self.relationships_vdb.upsert(relation_vdb_data)
@ -215,16 +205,18 @@ class BaseModalProcessor:
)
async def _insert_done(self) -> None:
await asyncio.gather(*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [
self.text_chunks_db,
self.chunks_vdb,
self.entities_vdb,
self.relationships_vdb,
self.knowledge_graph_inst,
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [
self.text_chunks_db,
self.chunks_vdb,
self.entities_vdb,
self.relationships_vdb,
self.knowledge_graph_inst,
]
]
])
)
class ImageModalProcessor(BaseModalProcessor):
@ -232,7 +224,7 @@ class ImageModalProcessor(BaseModalProcessor):
def __init__(self, lightrag: LightRAG, modal_caption_func):
"""Initialize image processor
Args:
lightrag: LightRAG instance
modal_caption_func: Function for generating descriptions (supporting image understanding)
@ -243,8 +235,7 @@ class ImageModalProcessor(BaseModalProcessor):
"""Encode image to base64"""
try:
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(
image_file.read()).decode('utf-8')
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
return encoded_string
except Exception as e:
logger.error(f"Failed to encode image {image_path}: {e}")
@ -309,13 +300,12 @@ class ImageModalProcessor(BaseModalProcessor):
response = await self.modal_caption_func(
vision_prompt,
image_data=image_base64,
system_prompt=
"You are an expert image analyst. Provide detailed, accurate descriptions."
system_prompt="You are an expert image analyst. Provide detailed, accurate descriptions.",
)
else:
# Analyze based on existing text information
text_prompt = f"""Based on the following image information, provide analysis:
Image Path: {image_path}
Captions: {captions}
Footnotes: {footnotes}
@ -324,13 +314,11 @@ class ImageModalProcessor(BaseModalProcessor):
response = await self.modal_caption_func(
text_prompt,
system_prompt=
"You are an expert image analyst. Provide detailed analysis based on available information."
system_prompt="You are an expert image analyst. Provide detailed analysis based on available information.",
)
# Parse response
enhanced_caption, entity_info = self._parse_response(
response, entity_name)
enhanced_caption, entity_info = self._parse_response(response, entity_name)
# Build complete image content
modal_chunk = f"""
@ -341,27 +329,30 @@ class ImageModalProcessor(BaseModalProcessor):
Visual Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk,
entity_info, file_path)
return await self._create_entity_and_chunk(
modal_chunk, entity_info, file_path
)
except Exception as e:
logger.error(f"Error processing image content: {e}")
# Fallback processing
fallback_entity = {
"entity_name": entity_name if entity_name else
f"image_{compute_mdhash_id(str(modal_content))}",
"entity_name": entity_name
if entity_name
else f"image_{compute_mdhash_id(str(modal_content))}",
"entity_type": "image",
"summary": f"Image content: {str(modal_content)[:100]}"
"summary": f"Image content: {str(modal_content)[:100]}",
}
return str(modal_content), fallback_entity
def _parse_response(self,
response: str,
entity_name: str = None) -> Tuple[str, Dict[str, Any]]:
def _parse_response(
self, response: str, entity_name: str = None
) -> Tuple[str, Dict[str, Any]]:
"""Parse model response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0))
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
@ -369,11 +360,14 @@ class ImageModalProcessor(BaseModalProcessor):
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(key in entity_data
for key in ["entity_name", "entity_type", "summary"]):
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
@ -382,13 +376,11 @@ class ImageModalProcessor(BaseModalProcessor):
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing image analysis response: {e}")
fallback_entity = {
"entity_name":
entity_name
if entity_name else f"image_{compute_mdhash_id(response)}",
"entity_type":
"image",
"summary":
response[:100] + "..." if len(response) > 100 else response
"entity_name": entity_name
if entity_name
else f"image_{compute_mdhash_id(response)}",
"entity_type": "image",
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity
@ -447,15 +439,15 @@ class TableModalProcessor(BaseModalProcessor):
response = await self.modal_caption_func(
table_prompt,
system_prompt=
"You are an expert data analyst. Provide detailed table analysis with specific insights."
system_prompt="You are an expert data analyst. Provide detailed table analysis with specific insights.",
)
# Parse response
enhanced_caption, entity_info = self._parse_table_response(
response, entity_name)
#TODO: Add Retry Mechanism
response, entity_name
)
# TODO: Add Retry Mechanism
# Build complete table content
modal_chunk = f"""Table Analysis:
@ -466,17 +458,16 @@ class TableModalProcessor(BaseModalProcessor):
Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk, entity_info,
file_path)
return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path)
def _parse_table_response(
self,
response: str,
entity_name: str = None) -> Tuple[str, Dict[str, Any]]:
self, response: str, entity_name: str = None
) -> Tuple[str, Dict[str, Any]]:
"""Parse table analysis response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0))
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
@ -484,11 +475,14 @@ class TableModalProcessor(BaseModalProcessor):
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(key in entity_data
for key in ["entity_name", "entity_type", "summary"]):
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
@ -497,13 +491,11 @@ class TableModalProcessor(BaseModalProcessor):
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing table analysis response: {e}")
fallback_entity = {
"entity_name":
entity_name
if entity_name else f"table_{compute_mdhash_id(response)}",
"entity_type":
"table",
"summary":
response[:100] + "..." if len(response) > 100 else response
"entity_name": entity_name
if entity_name
else f"table_{compute_mdhash_id(response)}",
"entity_type": "table",
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity
@ -559,13 +551,13 @@ class EquationModalProcessor(BaseModalProcessor):
response = await self.modal_caption_func(
equation_prompt,
system_prompt=
"You are an expert mathematician. Provide detailed mathematical analysis."
system_prompt="You are an expert mathematician. Provide detailed mathematical analysis.",
)
# Parse response
enhanced_caption, entity_info = self._parse_equation_response(
response, entity_name)
response, entity_name
)
# Build complete equation content
modal_chunk = f"""Mathematical Equation Analysis:
@ -574,17 +566,16 @@ class EquationModalProcessor(BaseModalProcessor):
Mathematical Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk, entity_info,
file_path)
return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path)
def _parse_equation_response(
self,
response: str,
entity_name: str = None) -> Tuple[str, Dict[str, Any]]:
self, response: str, entity_name: str = None
) -> Tuple[str, Dict[str, Any]]:
"""Parse equation analysis response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0))
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
@ -592,11 +583,14 @@ class EquationModalProcessor(BaseModalProcessor):
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(key in entity_data
for key in ["entity_name", "entity_type", "summary"]):
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
@ -605,13 +599,11 @@ class EquationModalProcessor(BaseModalProcessor):
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing equation analysis response: {e}")
fallback_entity = {
"entity_name":
entity_name
if entity_name else f"equation_{compute_mdhash_id(response)}",
"entity_type":
"equation",
"summary":
response[:100] + "..." if len(response) > 100 else response
"entity_name": entity_name
if entity_name
else f"equation_{compute_mdhash_id(response)}",
"entity_type": "equation",
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity
@ -651,13 +643,13 @@ class GenericModalProcessor(BaseModalProcessor):
response = await self.modal_caption_func(
generic_prompt,
system_prompt=
f"You are an expert content analyst specializing in {content_type} content."
system_prompt=f"You are an expert content analyst specializing in {content_type} content.",
)
# Parse response
enhanced_caption, entity_info = self._parse_generic_response(
response, entity_name, content_type)
response, entity_name, content_type
)
# Build complete content
modal_chunk = f"""{content_type.title()} Content Analysis:
@ -665,18 +657,16 @@ class GenericModalProcessor(BaseModalProcessor):
Analysis: {enhanced_caption}"""
return await self._create_entity_and_chunk(modal_chunk, entity_info,
file_path)
return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path)
def _parse_generic_response(
self,
response: str,
entity_name: str = None,
content_type: str = "content") -> Tuple[str, Dict[str, Any]]:
self, response: str, entity_name: str = None, content_type: str = "content"
) -> Tuple[str, Dict[str, Any]]:
"""Parse generic analysis response"""
try:
response_data = json.loads(
re.search(r"\{.*\}", response, re.DOTALL).group(0))
re.search(r"\{.*\}", response, re.DOTALL).group(0)
)
description = response_data.get("detailed_description", "")
entity_data = response_data.get("entity_info", {})
@ -684,11 +674,14 @@ class GenericModalProcessor(BaseModalProcessor):
if not description or not entity_data:
raise ValueError("Missing required fields in response")
if not all(key in entity_data
for key in ["entity_name", "entity_type", "summary"]):
if not all(
key in entity_data for key in ["entity_name", "entity_type", "summary"]
):
raise ValueError("Missing required fields in entity_info")
entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})"
entity_data["entity_name"] = (
entity_data["entity_name"] + f" ({entity_data['entity_type']})"
)
if entity_name:
entity_data["entity_name"] = entity_name
@ -697,12 +690,10 @@ class GenericModalProcessor(BaseModalProcessor):
except (json.JSONDecodeError, AttributeError, ValueError) as e:
logger.error(f"Error parsing generic analysis response: {e}")
fallback_entity = {
"entity_name":
entity_name if entity_name else
f"{content_type}_{compute_mdhash_id(response)}",
"entity_type":
content_type,
"summary":
response[:100] + "..." if len(response) > 100 else response
"entity_name": entity_name
if entity_name
else f"{content_type}_{compute_mdhash_id(response)}",
"entity_type": content_type,
"summary": response[:100] + "..." if len(response) > 100 else response,
}
return response, fallback_entity

View File

@ -26,15 +26,15 @@ from lightrag.mineru_parser import MineruParser
# Import specialized processors
from lightrag.modalprocessors import (
ImageModalProcessor,
TableModalProcessor,
TableModalProcessor,
EquationModalProcessor,
GenericModalProcessor
GenericModalProcessor,
)
class RAGAnything:
"""Multimodal Document Processing Pipeline - Complete document parsing and insertion pipeline"""
def __init__(
self,
lightrag: Optional[LightRAG] = None,
@ -43,11 +43,11 @@ class RAGAnything:
embedding_func: Optional[Callable] = None,
working_dir: str = "./rag_storage",
embedding_dim: int = 3072,
max_token_size: int = 8192
max_token_size: int = 8192,
):
"""
Initialize Multimodal Document Processing Pipeline
Args:
lightrag: Optional pre-initialized LightRAG instance
llm_model_func: LLM model function for text analysis
@ -63,64 +63,67 @@ class RAGAnything:
self.embedding_func = embedding_func
self.embedding_dim = embedding_dim
self.max_token_size = max_token_size
# Set up logging
setup_logger("RAGAnything")
self.logger = logging.getLogger("RAGAnything")
# Create working directory if needed
if not os.path.exists(working_dir):
os.makedirs(working_dir)
# Use provided LightRAG or mark for later initialization
self.lightrag = lightrag
self.modal_processors = {}
# If LightRAG is provided, initialize processors immediately
if self.lightrag is not None:
self._initialize_processors()
def _initialize_processors(self):
"""Initialize multimodal processors with appropriate model functions"""
if self.lightrag is None:
raise ValueError("LightRAG instance must be initialized before creating processors")
raise ValueError(
"LightRAG instance must be initialized before creating processors"
)
# Create different multimodal processors
self.modal_processors = {
"image": ImageModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.vision_model_func or self.llm_model_func
modal_caption_func=self.vision_model_func or self.llm_model_func,
),
"table": TableModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.llm_model_func
lightrag=self.lightrag, modal_caption_func=self.llm_model_func
),
"equation": EquationModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.llm_model_func
lightrag=self.lightrag, modal_caption_func=self.llm_model_func
),
"generic": GenericModalProcessor(
lightrag=self.lightrag,
modal_caption_func=self.llm_model_func
)
lightrag=self.lightrag, modal_caption_func=self.llm_model_func
),
}
self.logger.info("Multimodal processors initialized")
self.logger.info(f"Available processors: {list(self.modal_processors.keys())}")
async def _ensure_lightrag_initialized(self):
"""Ensure LightRAG instance is initialized, create if necessary"""
if self.lightrag is not None:
return
# Validate required functions
if self.llm_model_func is None:
raise ValueError("llm_model_func must be provided when LightRAG is not pre-initialized")
raise ValueError(
"llm_model_func must be provided when LightRAG is not pre-initialized"
)
if self.embedding_func is None:
raise ValueError("embedding_func must be provided when LightRAG is not pre-initialized")
raise ValueError(
"embedding_func must be provided when LightRAG is not pre-initialized"
)
from lightrag.kg.shared_storage import initialize_pipeline_status
# Create LightRAG instance with provided functions
self.lightrag = LightRAG(
working_dir=self.working_dir,
@ -134,88 +137,86 @@ class RAGAnything:
await self.lightrag.initialize_storages()
await initialize_pipeline_status()
# Initialize processors after LightRAG is ready
self._initialize_processors()
self.logger.info("LightRAG and multimodal processors initialized")
def parse_document(
self,
file_path: str,
self,
file_path: str,
output_dir: str = "./output",
parse_method: str = "auto",
display_stats: bool = True
display_stats: bool = True,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse document using MinerU
Args:
file_path: Path to the file to parse
output_dir: Output directory
parse_method: Parse method ("auto", "ocr", "txt")
display_stats: Whether to display content statistics
Returns:
(content_list, md_content): Content list and markdown text
"""
self.logger.info(f"Starting document parsing: {file_path}")
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Choose appropriate parsing method based on file extension
ext = file_path.suffix.lower()
try:
if ext in [".pdf"]:
self.logger.info(f"Detected PDF file, using PDF parser (OCR={parse_method == 'ocr'})...")
self.logger.info(
f"Detected PDF file, using PDF parser (OCR={parse_method == 'ocr'})..."
)
content_list, md_content = MineruParser.parse_pdf(
file_path,
output_dir,
use_ocr=(parse_method == "ocr")
file_path, output_dir, use_ocr=(parse_method == "ocr")
)
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]:
self.logger.info("Detected image file, using image parser...")
content_list, md_content = MineruParser.parse_image(
file_path,
output_dir
file_path, output_dir
)
elif ext in [".doc", ".docx", ".ppt", ".pptx"]:
self.logger.info("Detected Office document, using Office parser...")
content_list, md_content = MineruParser.parse_office_doc(
file_path,
output_dir
file_path, output_dir
)
else:
# For other or unknown formats, use generic parser
self.logger.info(f"Using generic parser for {ext} file (method={parse_method})...")
content_list, md_content = MineruParser.parse_document(
file_path,
parse_method=parse_method,
output_dir=output_dir
self.logger.info(
f"Using generic parser for {ext} file (method={parse_method})..."
)
content_list, md_content = MineruParser.parse_document(
file_path, parse_method=parse_method, output_dir=output_dir
)
except Exception as e:
self.logger.error(f"Error during parsing with specific parser: {str(e)}")
self.logger.warning("Falling back to generic parser...")
# If specific parser fails, fall back to generic parser
content_list, md_content = MineruParser.parse_document(
file_path,
parse_method=parse_method,
output_dir=output_dir
file_path, parse_method=parse_method, output_dir=output_dir
)
self.logger.info(f"Parsing complete! Extracted {len(content_list)} content blocks")
self.logger.info(
f"Parsing complete! Extracted {len(content_list)} content blocks"
)
self.logger.info(f"Markdown text length: {len(md_content)} characters")
# Display content statistics if requested
if display_stats:
self.logger.info("\nContent Information:")
self.logger.info(f"* Total blocks in content_list: {len(content_list)}")
self.logger.info(f"* Markdown content length: {len(md_content)} characters")
# Count elements by type
block_types: Dict[str, int] = {}
for block in content_list:
@ -223,29 +224,31 @@ class RAGAnything:
block_type = block.get("type", "unknown")
if isinstance(block_type, str):
block_types[block_type] = block_types.get(block_type, 0) + 1
self.logger.info("* Content block types:")
for block_type, count in block_types.items():
self.logger.info(f" - {block_type}: {count}")
return content_list, md_content
def _separate_content(self, content_list: List[Dict[str, Any]]) -> Tuple[str, List[Dict[str, Any]]]:
def _separate_content(
self, content_list: List[Dict[str, Any]]
) -> Tuple[str, List[Dict[str, Any]]]:
"""
Separate text content and multimodal content
Args:
content_list: Content list from MinerU parsing
Returns:
(text_content, multimodal_items): Pure text content and multimodal items list
"""
text_parts = []
multimodal_items = []
for item in content_list:
content_type = item.get("type", "text")
if content_type == "text":
# Text content
text = item.get("text", "")
@ -254,27 +257,27 @@ class RAGAnything:
else:
# Multimodal content (image, table, equation, etc.)
multimodal_items.append(item)
# Merge all text content
text_content = "\n\n".join(text_parts)
self.logger.info(f"Content separation complete:")
self.logger.info("Content separation complete:")
self.logger.info(f" - Text content length: {len(text_content)} characters")
self.logger.info(f" - Multimodal items count: {len(multimodal_items)}")
# Count multimodal types
modal_types = {}
for item in multimodal_items:
modal_type = item.get("type", "unknown")
modal_types[modal_type] = modal_types.get(modal_type, 0) + 1
if modal_types:
self.logger.info(f" - Multimodal type distribution: {modal_types}")
return text_content, multimodal_items
async def _insert_text_content(
self,
self,
input: str | list[str],
split_by_character: str | None = None,
split_by_character_only: bool = False,
@ -283,7 +286,7 @@ class RAGAnything:
):
"""
Insert pure text content into LightRAG
Args:
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
@ -292,24 +295,26 @@ class RAGAnything:
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: single string of the file path or list of file paths, used for citation
"""
"""
self.logger.info("Starting text content insertion into LightRAG...")
# Use LightRAG's insert method with all parameters
await self.lightrag.ainsert(
input=input,
file_paths=file_paths,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=ids
ids=ids,
)
self.logger.info("Text content insertion complete")
async def _process_multimodal_content(self, multimodal_items: List[Dict[str, Any]], file_path: str):
async def _process_multimodal_content(
self, multimodal_items: List[Dict[str, Any]], file_path: str
):
"""
Process multimodal content (using specialized processors)
Args:
multimodal_items: List of multimodal items
file_path: File path (for reference)
@ -317,43 +322,52 @@ class RAGAnything:
if not multimodal_items:
self.logger.debug("No multimodal content to process")
return
self.logger.info("Starting multimodal content processing...")
file_name = os.path.basename(file_path)
for i, item in enumerate(multimodal_items):
try:
content_type = item.get("type", "unknown")
self.logger.info(f"Processing item {i+1}/{len(multimodal_items)}: {content_type} content")
self.logger.info(
f"Processing item {i+1}/{len(multimodal_items)}: {content_type} content"
)
# Select appropriate processor
processor = self._get_processor_for_type(content_type)
if processor:
enhanced_caption, entity_info = await processor.process_multimodal_content(
(
enhanced_caption,
entity_info,
) = await processor.process_multimodal_content(
modal_content=item,
content_type=content_type,
file_path=file_name
file_path=file_name,
)
self.logger.info(
f"{content_type} processing complete: {entity_info.get('entity_name', 'Unknown')}"
)
self.logger.info(f"{content_type} processing complete: {entity_info.get('entity_name', 'Unknown')}")
else:
self.logger.warning(f"No suitable processor found for {content_type} type content")
self.logger.warning(
f"No suitable processor found for {content_type} type content"
)
except Exception as e:
self.logger.error(f"Error processing multimodal content: {str(e)}")
self.logger.debug("Exception details:", exc_info=True)
continue
self.logger.info("Multimodal content processing complete")
def _get_processor_for_type(self, content_type: str):
"""
Get appropriate processor based on content type
Args:
content_type: Content type
Returns:
Corresponding processor instance
"""
@ -369,18 +383,18 @@ class RAGAnything:
return self.modal_processors.get("generic")
async def process_document_complete(
self,
file_path: str,
self,
file_path: str,
output_dir: str = "./output",
parse_method: str = "auto",
display_stats: bool = True,
split_by_character: str | None = None,
split_by_character_only: bool = False,
doc_id: str | None = None
doc_id: str | None = None,
):
"""
Complete document processing workflow
Args:
file_path: Path to the file to process
output_dir: MinerU output directory
@ -392,35 +406,32 @@ class RAGAnything:
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
self.logger.info(f"Starting complete document processing: {file_path}")
# Step 1: Parse document using MinerU
content_list, md_content = self.parse_document(
file_path,
output_dir,
parse_method,
display_stats
file_path, output_dir, parse_method, display_stats
)
# Step 2: Separate text and multimodal content
text_content, multimodal_items = self._separate_content(content_list)
# Step 3: Insert pure text content with all parameters
if text_content.strip():
file_name = os.path.basename(file_path)
await self._insert_text_content(
text_content,
text_content,
file_paths=file_name,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=doc_id
ids=doc_id,
)
# Step 4: Process multimodal content (using specialized processors)
if multimodal_items:
await self._process_multimodal_content(multimodal_items, file_path)
self.logger.info(f"Document {file_path} processing complete!")
async def process_folder_complete(
@ -433,11 +444,11 @@ class RAGAnything:
split_by_character_only: bool = False,
file_extensions: Optional[List[str]] = None,
recursive: bool = True,
max_workers: int = 1
max_workers: int = 1,
):
"""
Process all files in a folder in batch
Args:
folder_path: Path to the folder to process
output_dir: MinerU output directory
@ -451,75 +462,98 @@ class RAGAnything:
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
folder_path = Path(folder_path)
if not folder_path.exists() or not folder_path.is_dir():
raise ValueError(f"Folder does not exist or is not a valid directory: {folder_path}")
raise ValueError(
f"Folder does not exist or is not a valid directory: {folder_path}"
)
# Supported file formats
supported_extensions = {
".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif",
".doc", ".docx", ".ppt", ".pptx", ".txt", ".md"
".pdf",
".jpg",
".jpeg",
".png",
".bmp",
".tiff",
".tif",
".doc",
".docx",
".ppt",
".pptx",
".txt",
".md",
}
# Use specified extensions or all supported formats
if file_extensions:
target_extensions = set(ext.lower() for ext in file_extensions)
# Validate if all are supported formats
unsupported = target_extensions - supported_extensions
if unsupported:
self.logger.warning(f"The following file formats may not be fully supported: {unsupported}")
self.logger.warning(
f"The following file formats may not be fully supported: {unsupported}"
)
else:
target_extensions = supported_extensions
# Collect all files to process
files_to_process = []
if recursive:
# Recursively traverse all subfolders
for file_path in folder_path.rglob("*"):
if file_path.is_file() and file_path.suffix.lower() in target_extensions:
if (
file_path.is_file()
and file_path.suffix.lower() in target_extensions
):
files_to_process.append(file_path)
else:
# Process only current folder
for file_path in folder_path.glob("*"):
if file_path.is_file() and file_path.suffix.lower() in target_extensions:
if (
file_path.is_file()
and file_path.suffix.lower() in target_extensions
):
files_to_process.append(file_path)
if not files_to_process:
self.logger.info(f"No files to process found in {folder_path}")
return
self.logger.info(f"Found {len(files_to_process)} files to process")
self.logger.info(f"File type distribution:")
self.logger.info("File type distribution:")
# Count file types
file_type_count = {}
for file_path in files_to_process:
ext = file_path.suffix.lower()
file_type_count[ext] = file_type_count.get(ext, 0) + 1
for ext, count in sorted(file_type_count.items()):
self.logger.info(f" {ext}: {count} files")
# Create progress tracking
processed_count = 0
failed_files = []
# Use semaphore to control concurrency
semaphore = asyncio.Semaphore(max_workers)
async def process_single_file(file_path: Path, index: int) -> None:
"""Process a single file"""
async with semaphore:
nonlocal processed_count
try:
self.logger.info(f"[{index}/{len(files_to_process)}] Processing: {file_path}")
self.logger.info(
f"[{index}/{len(files_to_process)}] Processing: {file_path}"
)
# Create separate output directory for each file
file_output_dir = Path(output_dir) / file_path.stem
file_output_dir.mkdir(parents=True, exist_ok=True)
# Process file
await self.process_document_complete(
file_path=str(file_path),
@ -527,56 +561,56 @@ class RAGAnything:
parse_method=parse_method,
display_stats=display_stats,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only
split_by_character_only=split_by_character_only,
)
processed_count += 1
self.logger.info(f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}")
self.logger.info(
f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}"
)
except Exception as e:
self.logger.error(f"[{index}/{len(files_to_process)}] Failed to process: {file_path}")
self.logger.error(
f"[{index}/{len(files_to_process)}] Failed to process: {file_path}"
)
self.logger.error(f"Error: {str(e)}")
failed_files.append((file_path, str(e)))
# Create all processing tasks
tasks = []
for index, file_path in enumerate(files_to_process, 1):
task = process_single_file(file_path, index)
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
# Output processing statistics
self.logger.info("\n===== Batch Processing Complete =====")
self.logger.info(f"Total files: {len(files_to_process)}")
self.logger.info(f"Successfully processed: {processed_count}")
self.logger.info(f"Failed: {len(failed_files)}")
if failed_files:
self.logger.info("\nFailed files:")
for file_path, error in failed_files:
self.logger.info(f" - {file_path}: {error}")
return {
"total": len(files_to_process),
"success": processed_count,
"failed": len(failed_files),
"failed_files": failed_files
"failed_files": failed_files,
}
async def query_with_multimodal(
self,
query: str,
mode: str = "hybrid"
) -> str:
async def query_with_multimodal(self, query: str, mode: str = "hybrid") -> str:
"""
Query with multimodal content support
Args:
query: Query content
mode: Query mode
Returns:
Query result
"""
@ -588,45 +622,65 @@ class RAGAnything:
"2. Process documents first using process_document_complete() or process_folder_complete() "
"to create and populate the LightRAG instance."
)
result = await self.lightrag.aquery(
query,
param=QueryParam(mode=mode)
)
result = await self.lightrag.aquery(query, param=QueryParam(mode=mode))
return result
def get_processor_info(self) -> Dict[str, Any]:
"""Get processor information"""
if not self.modal_processors:
return {"status": "Not initialized"}
info = {
"status": "Initialized",
"processors": {},
"models": {
"llm_model": "External function" if self.llm_model_func else "Not provided",
"vision_model": "External function" if self.vision_model_func else "Not provided",
"embedding_model": "External function" if self.embedding_func else "Not provided"
}
"llm_model": "External function"
if self.llm_model_func
else "Not provided",
"vision_model": "External function"
if self.vision_model_func
else "Not provided",
"embedding_model": "External function"
if self.embedding_func
else "Not provided",
},
}
for proc_type, processor in self.modal_processors.items():
info["processors"][proc_type] = {
"class": processor.__class__.__name__,
"supports": self._get_processor_supports(proc_type)
"supports": self._get_processor_supports(proc_type),
}
return info
def _get_processor_supports(self, proc_type: str) -> List[str]:
"""Get processor supported features"""
supports_map = {
"image": ["Image content analysis", "Visual understanding", "Image description generation", "Image entity extraction"],
"table": ["Table structure analysis", "Data statistics", "Trend identification", "Table entity extraction"],
"equation": ["Mathematical formula parsing", "Variable identification", "Formula meaning explanation", "Formula entity extraction"],
"generic": ["General content analysis", "Structured processing", "Entity extraction"]
"image": [
"Image content analysis",
"Visual understanding",
"Image description generation",
"Image entity extraction",
],
"table": [
"Table structure analysis",
"Data statistics",
"Trend identification",
"Table entity extraction",
],
"equation": [
"Mathematical formula parsing",
"Variable identification",
"Formula meaning explanation",
"Formula entity extraction",
],
"generic": [
"General content analysis",
"Structured processing",
"Entity extraction",
],
}
return supports_map.get(proc_type, ["Basic processing"])